.getILoggerFactory();
if (lc != null) {
for (ch.qos.logback.classic.Logger l : lc.getLoggerList()) {
- if (loggerName == null || l.getName().startsWith(loggerName)) {
+ if ((loggerName == null) || l.getName().startsWith(loggerName)) {
ci.println(retrieveLogLevel(l));
}
}
ci.println("Cache not supplied");
return;
}
+ int count = 0;
c = (ConcurrentMap<Object, Object>) this.icluster.getCache(containerName, cacheName);
if (c != null) {
for (Map.Entry<Object, Object> e : c.entrySet()) {
ci.println("Element " + entry.getKey() + "(hashCode="
+ entry.getKey().hashCode() + ") has value = (" + res
+ ")");
+ count++;
}
+ ci.println("Dumped " + count + " records");
} else {
ci.println("Cache " + cacheName + " on container " + containerName
+ " not existant!");
org.opendaylight.controller.sal.topology,
org.opendaylight.controller.sal.utils,
org.opendaylight.controller.sal.reader,
+ org.opendaylight.controller.clustering.services,
org.apache.commons.collections15,
org.opendaylight.controller.switchmanager,
org.opendaylight.controller.topologymanager,
edu.uci.ics.jung.algorithms.shortestpath,
edu.uci.ics.jung.graph.util,
org.apache.felix.dm,
+ org.osgi.framework,
+ org.apache.felix.service.command,
org.junit;resolution:=optional
</Import-Package>
<Bundle-Activator>
package org.opendaylight.controller.routing.dijkstra_implementation.internal;
import java.util.Dictionary;
+import java.util.HashSet;
import java.util.Hashtable;
+import java.util.Set;
+
import org.apache.felix.dm.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
import org.opendaylight.controller.sal.routing.IListenRoutingUpdates;
import org.opendaylight.controller.sal.routing.IRouting;
import org.opendaylight.controller.switchmanager.ISwitchManager;
-import org.opendaylight.controller.sal.reader.IReadService;
import org.opendaylight.controller.topologymanager.ITopologyManager;
-import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
public class Activator extends ComponentActivatorAbstractBase {
protected static final Logger logger = LoggerFactory
* ComponentActivatorAbstractBase.
*
*/
+ @Override
public void init() {
}
* cleanup done by ComponentActivatorAbstractBase
*
*/
+ @Override
public void destroy() {
}
* instantiated in order to get an fully working implementation
* Object
*/
+ @Override
public Object[] getImplementations() {
Object[] res = { DijkstraImplementation.class };
return res;
* also optional per-container different behavior if needed, usually
* should not be the case though.
*/
- public void configureInstance(Component c, Object imp, String containerName) {
+ @Override
+ public void configureInstance(final Component c, final Object imp, final String containerName) {
if (imp.equals(DijkstraImplementation.class)) {
// export the service
- Dictionary<String, String> props = new Hashtable<String, String>();
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put("topoListenerName", "routing.Dijkstra");
- c.setInterface(new String[] { ITopologyManagerAware.class.getName(),
- IRouting.class.getName() }, props);
+
+ c.setInterface(new String[] { ITopologyManagerClusterWideAware.class.getName(), IRouting.class.getName() },
+ props);
// Now lets add a service dependency to make sure the
// provider of service exists
- c.add(createContainerServiceDependency(containerName).setService(
- IListenRoutingUpdates.class).setCallbacks(
- "setListenRoutingUpdates", "unsetListenRoutingUpdates")
+ c.add(createContainerServiceDependency(containerName).setService(IListenRoutingUpdates.class)
+ .setCallbacks("setListenRoutingUpdates", "unsetListenRoutingUpdates")
.setRequired(false));
- c.add(createContainerServiceDependency(containerName).setService(
- ISwitchManager.class).setCallbacks("setSwitchManager",
- "unsetSwitchManager").setRequired(true));
+ c.add(createContainerServiceDependency(containerName).setService(ISwitchManager.class)
+ .setCallbacks("setSwitchManager", "unsetSwitchManager")
+ .setRequired(true));
- c.add(createContainerServiceDependency(containerName).setService(
- ITopologyManager.class).setCallbacks("setTopologyManager",
- "unsetTopologyManager").setRequired(true));
+ c.add(createContainerServiceDependency(containerName).setService(ITopologyManager.class)
+ .setCallbacks("setTopologyManager", "unsetTopologyManager")
+ .setRequired(true));
- c.add(createContainerServiceDependency(containerName).setService(
- IReadService.class).setCallbacks("setReadService",
- "unsetReadService").setRequired(true));
+ c.add(createContainerServiceDependency(containerName).setService(IClusterContainerServices.class)
+ .setCallbacks("setClusterContainerService", "unsetClusterContainerService")
+ .setRequired(true));
}
}
+
+ @Override
+ protected Object[] getGlobalImplementations() {
+ final Object[] res = { DijkstraImplementationCLI.class };
+ return res;
+ }
}
*/
package org.opendaylight.controller.routing.dijkstra_implementation.internal;
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.sal.core.Bandwidth;
import org.opendaylight.controller.sal.core.ConstructionException;
import org.opendaylight.controller.sal.core.Edge;
import org.opendaylight.controller.sal.core.Path;
import org.opendaylight.controller.sal.core.Property;
import org.opendaylight.controller.sal.core.UpdateType;
-import org.opendaylight.controller.sal.reader.IReadService;
import org.opendaylight.controller.sal.routing.IListenRoutingUpdates;
-
import org.opendaylight.controller.sal.routing.IRouting;
import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
import org.opendaylight.controller.switchmanager.ISwitchManager;
import org.opendaylight.controller.topologymanager.ITopologyManager;
-import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
import edu.uci.ics.jung.algorithms.shortestpath.DijkstraShortestPath;
import edu.uci.ics.jung.graph.Graph;
import edu.uci.ics.jung.graph.SparseMultigraph;
import edu.uci.ics.jung.graph.util.EdgeType;
+
import java.lang.Exception;
import java.lang.IllegalArgumentException;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.collections15.Transformer;
-public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
- private static Logger log = LoggerFactory
- .getLogger(DijkstraImplementation.class);
+@SuppressWarnings("rawtypes")
+public class DijkstraImplementation implements IRouting, ITopologyManagerClusterWideAware {
+ private static Logger log = LoggerFactory.getLogger(DijkstraImplementation.class);
private ConcurrentMap<Short, Graph<Node, Edge>> topologyBWAware;
private ConcurrentMap<Short, DijkstraShortestPath<Node, Edge>> sptBWAware;
DijkstraShortestPath<Node, Edge> mtp; // Max Throughput Path
private Set<IListenRoutingUpdates> routingAware;
private ISwitchManager switchManager;
private ITopologyManager topologyManager;
- private IReadService readService;
private static final long DEFAULT_LINK_SPEED = Bandwidth.BW1Gbps;
+ private IClusterContainerServices clusterContainerService;
- public void setListenRoutingUpdates(IListenRoutingUpdates i) {
+ public void setListenRoutingUpdates(final IListenRoutingUpdates i) {
if (this.routingAware == null) {
this.routingAware = new HashSet<IListenRoutingUpdates>();
}
}
}
- public void unsetListenRoutingUpdates(IListenRoutingUpdates i) {
+ public void unsetListenRoutingUpdates(final IListenRoutingUpdates i) {
if (this.routingAware == null) {
return;
}
Transformer<Edge, ? extends Number> mtTransformer = null;
if (EdgeWeightMap == null) {
mtTransformer = new Transformer<Edge, Double>() {
+ @Override
public Double transform(Edge e) {
if (switchManager == null) {
log.error("switchManager is null");
log.error("srcNC:{} or dstNC:{} is null", srcNC, dstNC);
return (double) -1;
}
- Bandwidth bwSrc = (Bandwidth) switchManager
- .getNodeConnectorProp(srcNC,
- Bandwidth.BandwidthPropName);
- Bandwidth bwDst = (Bandwidth) switchManager
- .getNodeConnectorProp(dstNC,
- Bandwidth.BandwidthPropName);
+ Bandwidth bwSrc = (Bandwidth) switchManager.getNodeConnectorProp(srcNC,
+ Bandwidth.BandwidthPropName);
+ Bandwidth bwDst = (Bandwidth) switchManager.getNodeConnectorProp(dstNC,
+ Bandwidth.BandwidthPropName);
long srcLinkSpeed = 0, dstLinkSpeed = 0;
- if ((bwSrc == null)
- || ((srcLinkSpeed = bwSrc.getValue()) == 0)) {
- log.debug(
- "srcNC: {} - Setting srcLinkSpeed to Default!",
- srcNC);
+ if ((bwSrc == null) || ((srcLinkSpeed = bwSrc.getValue()) == 0)) {
+ log.debug("srcNC: {} - Setting srcLinkSpeed to Default!", srcNC);
srcLinkSpeed = DEFAULT_LINK_SPEED;
}
- if ((bwDst == null)
- || ((dstLinkSpeed = bwDst.getValue()) == 0)) {
- log.debug(
- "dstNC: {} - Setting dstLinkSpeed to Default!",
- dstNC);
+ if ((bwDst == null) || ((dstLinkSpeed = bwDst.getValue()) == 0)) {
+ log.debug("dstNC: {} - Setting dstLinkSpeed to Default!", dstNC);
dstLinkSpeed = DEFAULT_LINK_SPEED;
}
- long avlSrcThruPut = srcLinkSpeed
- - readService.getTransmitRate(srcNC);
- long avlDstThruPut = dstLinkSpeed
- - readService.getTransmitRate(dstNC);
-
- // Use lower of the 2 available thruput as the available
- // thruput
- long avlThruPut = avlSrcThruPut < avlDstThruPut ? avlSrcThruPut
- : avlDstThruPut;
+ // TODO: revisit the logic below with the real use case in
+ // mind
+ // For now we assume the throughput to be the speed of the
+ // link itself
+ // this kind of logic require information that should be
+ // polled by statistic manager and are not yet available,
+ // also this service at the moment is not used, so to be
+ // revisited later on
+ long avlSrcThruPut = srcLinkSpeed;
+ long avlDstThruPut = dstLinkSpeed;
+
+ // Use lower of the 2 available throughput as the available
+ // throughput
+ long avlThruPut = avlSrcThruPut < avlDstThruPut ? avlSrcThruPut : avlDstThruPut;
if (avlThruPut <= 0) {
- log.debug("Edge {}: Available Throughput {} <= 0!", e,
- avlThruPut);
+ log.debug("Edge {}: Available Throughput {} <= 0!", e, avlThruPut);
return (double) -1;
}
return (double) (Bandwidth.BW1Pbps / avlThruPut);
};
} else {
mtTransformer = new Transformer<Edge, Number>() {
+ @Override
public Number transform(Edge e) {
return EdgeWeightMap.get(e);
}
}
@Override
- public Path getRoute(Node src, Node dst) {
- if (src == null || dst == null) {
+ public Path getRoute(final Node src, final Node dst) {
+ if ((src == null) || (dst == null)) {
return null;
}
return getRoute(src, dst, (short) 0);
}
@Override
- public synchronized Path getRoute(Node src, Node dst, Short Bw) {
+ public synchronized Path getRoute(final Node src, final Node dst, final Short Bw) {
DijkstraShortestPath<Node, Edge> spt = this.sptBWAware.get(Bw);
- if (spt == null)
+ if (spt == null) {
return null;
+ }
List<Edge> path;
try {
path = spt.getPath(src, dst);
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private synchronized boolean updateTopo(Edge edge, Short bw, boolean added) {
+ @SuppressWarnings({ "unchecked" })
+ private synchronized boolean updateTopo(Edge edge, Short bw, UpdateType type) {
Graph<Node, Edge> topo = this.topologyBWAware.get(bw);
DijkstraShortestPath<Node, Edge> spt = this.sptBWAware.get(bw);
boolean edgePresentInGraph = false;
this.sptBWAware.put(bw, spt);
}
- if (added) {
+ switch (type) {
+ case ADDED:
// Make sure the vertex are there before adding the edge
topo.addVertex(src.getNode());
topo.addVertex(dst.getNode());
edgePresentInGraph = topo.containsEdge(edge);
if (edgePresentInGraph == false) {
try {
- topo.addEdge(new Edge(src, dst), src.getNode(),
- dst.getNode(), EdgeType.DIRECTED);
- } catch (ConstructionException e) {
+ topo.addEdge(new Edge(src, dst), src.getNode(), dst.getNode(), EdgeType.DIRECTED);
+ } catch (final ConstructionException e) {
log.error("", e);
return edgePresentInGraph;
}
}
- } else {
+ case CHANGED:
+ // Mainly raised only on properties update, so not really useful
+ // in this case
+ break;
+ case REMOVED:
// Remove the edge
try {
topo.removeEdge(new Edge(src, dst));
- } catch (ConstructionException e) {
+ } catch (final ConstructionException e) {
log.error("", e);
return edgePresentInGraph;
}
// If the src and dst vertex don't have incoming or
// outgoing links we can get ride of them
- if (topo.containsVertex(src.getNode())
- && topo.inDegree(src.getNode()) == 0
- && topo.outDegree(src.getNode()) == 0) {
+ if (topo.containsVertex(src.getNode()) && (topo.inDegree(src.getNode()) == 0)
+ && (topo.outDegree(src.getNode()) == 0)) {
log.debug("Removing vertex {}", src);
topo.removeVertex(src.getNode());
}
- if (topo.containsVertex(dst.getNode())
- && topo.inDegree(dst.getNode()) == 0
- && topo.outDegree(dst.getNode()) == 0) {
+ if (topo.containsVertex(dst.getNode()) && (topo.inDegree(dst.getNode()) == 0)
+ && (topo.outDegree(dst.getNode()) == 0)) {
log.debug("Removing vertex {}", dst);
topo.removeVertex(dst.getNode());
}
+ break;
}
spt.reset();
if (bw.equals(baseBW)) {
return edgePresentInGraph;
}
- private boolean edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
+ private boolean edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean local) {
String srcType = null;
String dstType = null;
- if (e == null || type == null) {
+ log.trace("Got an edgeUpdate: {} props: {} update type: {} local: {}", new Object[] { e, props, type, local });
+
+ if ((e == null) || (type == null)) {
log.error("Edge or Update type are null!");
return false;
} else {
Bandwidth bw = new Bandwidth(0);
boolean newEdge = false;
- if (props != null)
+ if (props != null) {
props.remove(bw);
-
- if (log.isDebugEnabled()) {
- log.debug("edgeUpdate: {} bw: {}", e, bw.getValue());
}
Short baseBW = Short.valueOf((short) 0);
- boolean add = (type == UpdateType.ADDED) ? true : false;
// Update base topo
- newEdge = !updateTopo(e, baseBW, add);
+ newEdge = !updateTopo(e, baseBW, type);
if (newEdge == true) {
if (bw.getValue() != baseBW) {
// Update BW topo
- updateTopo(e, (short) bw.getValue(), add);
+ updateTopo(e, (short) bw.getValue(), type);
}
}
return newEdge;
@Override
public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+ log.trace("Start of a Bulk EdgeUpdate with " + topoedgeupdateList.size() + " elements");
boolean callListeners = false;
for (int i = 0; i < topoedgeupdateList.size(); i++) {
Edge e = topoedgeupdateList.get(i).getEdge();
- Set<Property> p = topoedgeupdateList.get(i).getProperty();
- UpdateType type = topoedgeupdateList.get(i).getUpdateType();
- if ((edgeUpdate(e, type, p)) && (!callListeners)) {
+ Set<Property> p = topoedgeupdateList.get(i)
+ .getProperty();
+ UpdateType type = topoedgeupdateList.get(i)
+ .getUpdateType();
+ boolean isLocal = topoedgeupdateList.get(i)
+ .isLocal();
+ if ((edgeUpdate(e, type, p, isLocal)) && (!callListeners)) {
callListeners = true;
}
}
- if ((callListeners) && (this.routingAware != null)) {
+
+ // The routing listeners should only be called on the coordinator, to
+ // avoid multiple controller cluster nodes to actually do the
+ // recalculation when only one need to react
+ boolean amICoordinator = true;
+ if (this.clusterContainerService != null) {
+ amICoordinator = this.clusterContainerService.amICoordinator();
+ }
+ if ((callListeners) && (this.routingAware != null) && amICoordinator) {
+ log.trace("Calling the routing listeners");
for (IListenRoutingUpdates ra : this.routingAware) {
try {
ra.recalculateDone();
}
}
}
+ log.trace("End of a Bulk EdgeUpdate");
}
/**
@SuppressWarnings({ "unchecked", "rawtypes" })
public void init() {
log.debug("Routing init() is called");
- this.topologyBWAware = (ConcurrentMap<Short, Graph<Node, Edge>>) new ConcurrentHashMap();
- this.sptBWAware = (ConcurrentMap<Short, DijkstraShortestPath<Node, Edge>>) new ConcurrentHashMap();
+ this.topologyBWAware = new ConcurrentHashMap<Short, Graph<Node, Edge>>();
+ this.sptBWAware = new ConcurrentHashMap<Short, DijkstraShortestPath<Node, Edge>>();
// Now create the default topology, which doesn't consider the
// BW, also create the corresponding Dijkstra calculation
Graph<Node, Edge> g = new SparseMultigraph();
log.debug("Routing stop() is called");
}
- @Override
- public void edgeOverUtilized(Edge edge) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void edgeUtilBackToNormal(Edge edge) {
- // TODO Auto-generated method stub
-
- }
-
public void setSwitchManager(ISwitchManager switchManager) {
this.switchManager = switchManager;
}
}
}
- public void setReadService(IReadService readService) {
- this.readService = readService;
- }
-
- public void unsetReadService(IReadService readService) {
- if (this.readService == readService) {
- this.readService = null;
- }
- }
-
public void setTopologyManager(ITopologyManager tm) {
this.topologyManager = tm;
}
this.topologyManager = null;
}
}
+
+ void setClusterContainerService(IClusterContainerServices s) {
+ log.debug("Cluster Service set");
+ this.clusterContainerService = s;
+ }
+
+ void unsetClusterContainerService(IClusterContainerServices s) {
+ if (this.clusterContainerService == s) {
+ log.debug("Cluster Service removed!");
+ this.clusterContainerService = null;
+ }
+ }
+
+ @Override
+ public void edgeOverUtilized(Edge edge) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void edgeUtilBackToNormal(Edge edge) {
+ // TODO Auto-generated method stub
+
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.routing.dijkstra_implementation.internal;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.felix.service.command.Descriptor;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.core.Path;
+import org.opendaylight.controller.sal.routing.IRouting;
+import org.opendaylight.controller.sal.utils.ServiceHelper;
+import org.osgi.framework.ServiceRegistration;
+
+public class DijkstraImplementationCLI {
+ @SuppressWarnings("rawtypes")
+ private ServiceRegistration sr = null;
+
+ public void init() {
+ }
+
+ public void destroy() {
+ }
+
+ public void start() {
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put("osgi.command.scope", "odpcontroller");
+ props.put("osgi.command.function", new String[] { "getRoute" });
+ this.sr = ServiceHelper.registerGlobalServiceWReg(DijkstraImplementationCLI.class, this, props);
+ }
+
+ public void stop() {
+ if (this.sr != null) {
+ this.sr.unregister();
+ this.sr = null;
+ }
+ }
+
+ @Descriptor("Retrieves a Route between two Nodes in the discovered Topology DB")
+ public void getRoute(
+ @Descriptor("Container on the context of which the routing service need to be looked up") String container,
+ @Descriptor("String representation of the Source Node, this need to be consumable from Node.fromString()") String srcNode,
+ @Descriptor("String representation of the Destination Node") String dstNode) {
+ final IRouting r = (IRouting) ServiceHelper.getInstance(IRouting.class, container, this);
+
+ if (r == null) {
+ System.out.println("Cannot find the routing instance on container:" + container);
+ return;
+ }
+
+ final Node src = Node.fromString(srcNode);
+ final Node dst = Node.fromString(dstNode);
+ final Path p = r.getRoute(src, dst);
+ if (p != null) {
+ System.out.println("Route between srcNode:" + src + " and dstNode:" + dst + " = " + p);
+ } else {
+ System.out.println("There is no route between srcNode:" + src + " and dstNode:" + dst);
+ }
+ }
+}
/**
* The class represents an Edge, the Edge's Property Set and its UpdateType.
*/
-
public class TopoEdgeUpdate {
private Edge edge;
private Set<Property> props;
private UpdateType type;
+ private boolean isLocal;
+ /**
+ * Constructor for a topology element update. A TopologyElementUpdate is an
+ * object that summarize what has happened on an Edge and if the update is
+ * generated locally to this controller or no
+ *
+ * @param e
+ * Edge being updated
+ * @param p
+ * Set of Properties attached to the edge
+ * @param t
+ * Type of update
+ */
public TopoEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
edge = e;
props = p;
type = t;
+ setLocal(true);
}
public Edge getEdge() {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((edge == null) ? 0 : edge.hashCode());
- result = prime * result + ((props == null) ? 0 : props.hashCode());
- result = prime * result + ((type == null) ? 0 : type.hashCode());
+ result = (prime * result) + ((edge == null) ? 0 : edge.hashCode());
+ result = (prime * result) + ((type == null) ? 0 : type.hashCode());
return result;
}
@Override
public String toString() {
- return "TopoEdgeUpdate [edge=" + edge + ", props=" + props + ", type="
- + type + "]";
+ return "TopoEdgeUpdate [edge=" + edge + ", props=" + props + ", type=" + type + ", isLocal=" + isLocal + "]";
}
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
TopoEdgeUpdate other = (TopoEdgeUpdate) obj;
if (edge == null) {
- if (other.edge != null)
- return false;
- } else if (!edge.equals(other.edge))
- return false;
- if (props == null) {
- if (other.props != null)
+ if (other.edge != null) {
return false;
- } else if (!props.equals(other.props))
+ }
+ } else if (!edge.equals(other.edge)) {
return false;
- if (type != other.type)
+ }
+ if (type != other.type) {
return false;
+ }
return true;
}
+
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ public void setLocal(boolean isLocal) {
+ this.isLocal = isLocal;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.topologymanager;
+
+public interface ITopologyManagerClusterWideAware extends ITopologyManagerAware {
+}
import java.util.Set;
import org.apache.felix.dm.Component;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
import org.opendaylight.controller.sal.topology.ITopologyService;
import org.opendaylight.controller.topologymanager.ITopologyManager;
import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* ComponentActivatorAbstractBase.
*
*/
+ @Override
public void init() {
}
* cleanup done by ComponentActivatorAbstractBase
*
*/
+ @Override
public void destroy() {
}
* instantiated in order to get an fully working implementation
* Object
*/
+ @Override
public Object[] getImplementations() {
Object[] res = { TopologyManagerImpl.class };
return res;
* also optional per-container different behavior if needed, usually
* should not be the case though.
*/
+ @Override
public void configureInstance(Component c, Object imp, String containerName) {
if (imp.equals(TopologyManagerImpl.class)) {
// export the service needed to listen to topology updates
Dictionary<String, Set<String>> props = new Hashtable<String, Set<String>>();
Set<String> propSet = new HashSet<String>();
- propSet.add("topologymanager.configSaveEvent");
+ propSet.add(TopologyManagerImpl.TOPOEDGESDB);
props.put("cachenames", propSet);
c.setInterface(new String[] { IListenTopoUpdates.class.getName(),
ITopologyManager.class.getName(),
- IConfigurationContainerAware.class.getName() }, props);
+ IConfigurationContainerAware.class.getName(),
+ ICacheUpdateAware.class.getName() }, props);
c.add(createContainerServiceDependency(containerName).setService(
ITopologyService.class).setCallbacks("setTopoService",
"setTopologyManagerAware", "unsetTopologyManagerAware")
.setRequired(false));
+ // These are all the listeners for a topology manager for the
+ // cluster wide events
+ // updates, there could be many or none
+ c.add(createContainerServiceDependency(containerName).setService(ITopologyManagerClusterWideAware.class)
+ .setCallbacks("setTopologyManagerClusterWideAware", "unsetTopologyManagerClusterWideAware")
+ .setRequired(false));
+
c.add(createContainerServiceDependency(containerName).setService(
IClusterContainerServices.class).setCallbacks(
"setClusterContainerService",
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.felix.dm.Component;
import org.eclipse.osgi.framework.console.CommandProvider;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
import org.opendaylight.controller.sal.utils.StatusCode;
import org.opendaylight.controller.topologymanager.ITopologyManager;
import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
* network topology. It provides service for applications to interact with
* topology database and notifies all the listeners of topology changes.
*/
-public class TopologyManagerImpl implements ITopologyManager,
-IConfigurationContainerAware, IListenTopoUpdates, IObjectReader,
-CommandProvider {
+public class TopologyManagerImpl implements
+ ICacheUpdateAware,
+ ITopologyManager,
+ IConfigurationContainerAware,
+ IListenTopoUpdates,
+ IObjectReader,
+ CommandProvider {
+ static final String TOPOEDGESDB = "topologymanager.edgesDB";
+ static final String TOPOHOSTSDB = "topologymanager.hostsDB";
+ static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
+ static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
private static final String SAVE = "Save";
private ITopologyService topoService;
// DB of all the NodeConnectors with an Host attached to it
private ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>> hostsDB;
// Topology Manager Aware listeners
- private Set<ITopologyManagerAware> topologyManagerAware =
- new CopyOnWriteArraySet<ITopologyManagerAware>();;
-
+ private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
+ // Topology Manager Aware listeners - for clusterwide updates
+ private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
+ new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
private static String ROOT = GlobalConstants.STARTUPHOME.toString();
private String userLinksFileName;
private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
- private ConcurrentMap<Long, String> configSaveEvent;
+ private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
+ private volatile Boolean shuttingDown = false;
+ private Thread notifyThread;
void nonClusterObjectCreate() {
hostsDB = new ConcurrentHashMap<NodeConnector, ImmutablePair<Host, Set<Property>>>();
nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
- configSaveEvent = new ConcurrentHashMap<Long, String>();
}
void setTopologyManagerAware(ITopologyManagerAware s) {
}
}
+ void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+ if (this.topologyManagerClusterWideAware != null) {
+ log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
+ this.topologyManagerClusterWideAware.add(s);
+ }
+ }
+
+ void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+ if (this.topologyManagerClusterWideAware != null) {
+ log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
+ this.topologyManagerClusterWideAware.remove(s);
+ }
+ }
+
void setTopoService(ITopologyService s) {
log.debug("Adding ITopologyService: {}", s);
this.topoService = s;
*
*/
void init(Component c) {
-
allocateCaches();
retrieveCaches();
-
String containerName = null;
Dictionary<?, ?> props = c.getServiceProperties();
if (props != null) {
userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
registerWithOSGIConsole();
loadConfiguration();
+ // Restore the shuttingDown status on init of the component
+ shuttingDown = false;
+ notifyThread = new Thread(new TopologyNotify(notifyQ));
}
@SuppressWarnings({ "unchecked", "deprecation" })
- private void allocateCaches(){
- if (this.clusterContainerService == null) {
- nonClusterObjectCreate();
- log.error("Cluster Services unavailable, allocated non-cluster caches!");
- return;
- }
-
+ private void allocateCaches() {
try {
- this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(
- "topologymanager.edgesDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.edgesDB =
+ (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.debug("topologymanager.edgesDB Cache already exists - destroy and recreate if needed");
+ log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.edgesDB Cache configuration invalid - check cache mode");
+ log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
}
try {
- this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
- .createCache("topologymanager.hostsDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.hostsDB =
+ (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.createCache(
+ TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.debug("topologymanager.hostsDB Cache already exists - destroy and recreate if needed");
+ log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.hostsDB Cache configuration invalid - check cache mode");
+ log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
}
try {
- this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
- .createCache("topologymanager.nodeConnectorDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.nodeConnectorsDB =
+ (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
+ TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.debug("topologymanager.nodeConnectorDB Cache already exists - destroy and recreate if needed");
+ log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.nodeConnectorDB Cache configuration invalid - check cache mode");
+ log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
}
try {
- this.userLinksDB = (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService
- .createCache("topologymanager.userLinksDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.userLinksDB =
+ (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
+ TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.debug("topologymanager.userLinksDB Cache already exists - destroy and recreate if needed");
+ log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.userLinksDB Cache configuration invalid - check cache mode");
+ log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
}
-
- try {
- this.configSaveEvent = (ConcurrentMap<Long, String>) this.clusterContainerService
- .createCache("topologymanager.configSaveEvent", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
- } catch (CacheExistException cee) {
- log.debug("topologymanager.configSaveEvent Cache already exists - destroy and recreate if needed");
- } catch (CacheConfigException cce) {
- log.error("topologymanager.configSaveEvent Cache configuration invalid - check cache mode");
- }
-
}
@SuppressWarnings({ "unchecked", "deprecation" })
return;
}
- this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService
- .getCache("topologymanager.edgesDB");
+ this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
if (edgesDB == null) {
- log.error("Failed to get cache for topologymanager.edgesDB");
+ log.error("Failed to get cache for " + TOPOEDGESDB);
}
- this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
- .getCache("topologymanager.hostsDB");
+ this.hostsDB =
+ (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
if (hostsDB == null) {
- log.error("Failed to get cache for topologymanager.hostsDB");
+ log.error("Failed to get cache for " + TOPOHOSTSDB);
}
- this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
- .getCache("topologymanager.nodeConnectorDB");
+ this.nodeConnectorsDB =
+ (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
if (nodeConnectorsDB == null) {
- log.error("Failed to get cache for topologymanager.nodeConnectorDB");
+ log.error("Failed to get cache for " + TOPONODECONNECTORDB);
}
- this.userLinksDB = (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService
- .getCache("topologymanager.userLinksDB");
+ this.userLinksDB =
+ (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
if (userLinksDB == null) {
- log.error("Failed to get cache for topologymanager.userLinksDB");
+ log.error("Failed to get cache for " + TOPOUSERLINKSDB);
}
-
- this.configSaveEvent = (ConcurrentMap<Long, String>) this.clusterContainerService
- .getCache("topologymanager.configSaveEvent");
- if (configSaveEvent == null) {
- log.error("Failed to get cache for topologymanager.configSaveEvent");
- }
-
}
/**
*
*/
void started() {
+ // Start the batcher thread for the cluster wide topology updates
+ notifyThread.start();
// SollicitRefresh MUST be called here else if called at init
// time it may sollicit refresh too soon.
log.debug("Sollicit topology refresh");
topoService.sollicitRefresh();
}
+ void stop() {
+ shuttingDown = true;
+ notifyThread.interrupt();
+ }
+
/**
* Function called by the dependency manager when at least one dependency
* become unsatisfied or when the component is shutting down because for
*
*/
void destroy() {
+ notifyQ.clear();
+ notifyThread = null;
}
@SuppressWarnings("unchecked")
@Override
public Status saveConfig() {
- // Publish the save config event to the cluster
- configSaveEvent.put(new Date().getTime(), SAVE );
return saveConfigInternal();
}
@Override
public Host getHostAttachedToNodeConnector(NodeConnector port) {
ImmutablePair<Host, Set<Property>> host;
- if (this.hostsDB == null || (host = this.hostsDB.get(port)) == null) {
+ if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) {
return null;
}
return host.getLeft();
TopologyUserLinkConfig link = userLinksDB.remove(linkName);
Edge linkTuple;
- if (link != null && (linkTuple = getLinkTuple(link)) != null) {
+ if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
if (! isProductionLink(linkTuple)) {
edgeUpdate(linkTuple, UpdateType.REMOVED, null);
}
log.warn("Link Utilization back to normal: {}", edge);
}
+ private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
+ TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
+ upd.setLocal(isLocal);
+ notifyQ.add(upd);
+ }
+
+ @Override
+ public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ // This is the case of an Edge being added to the topology DB
+ final Edge e = (Edge) key;
+ log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
+ edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
+ }
+ }
+
+ @Override
+ public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ final Edge e = (Edge) key;
+ log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+ final Set<Property> props = (Set<Property>) new_value;
+ edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
+ }
+ }
+
+ @Override
+ public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ final Edge e = (Edge) key;
+ log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
+ edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
+ }
+ }
+
+ class TopologyNotify implements Runnable {
+ private final BlockingQueue<TopoEdgeUpdate> notifyQ;
+ private TopoEdgeUpdate entry;
+ private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+ private boolean notifyListeners;
+
+ TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
+ this.notifyQ = notifyQ;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ log.trace("New run of TopologyNotify");
+ notifyListeners = false;
+ // First we block waiting for an element to get in
+ entry = notifyQ.take();
+ // Then we drain the whole queue if elements are
+ // in it without getting into any blocking condition
+ for (; entry != null; entry = notifyQ.poll()) {
+ teuList.add(entry);
+ notifyListeners = true;
+ }
+
+ // Notify listeners only if there were updates drained else
+ // give up
+ if (notifyListeners) {
+ log.trace("Notifier thread, notified a listener");
+ // Now update the listeners
+ for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
+ try {
+ s.edgeUpdate(teuList);
+ } catch (Exception exc) {
+ log.error("Exception on edge update:", exc);
+ }
+ }
+ }
+ teuList.clear();
+
+ // Lets sleep for sometime to allow aggregation of event
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ log.warn("TopologyNotify interrupted {}", e1.getMessage());
+ if (shuttingDown) {
+ return;
+ }
+ } catch (Exception e2) {
+ log.error("", e2);
+ }
+ }
+ }
+ }
}