From 8984fee53ee5f4a99eed43b01ad286ba0e781dd6 Mon Sep 17 00:00:00 2001 From: Giovanni Meo Date: Thu, 8 Aug 2013 19:43:38 +0200 Subject: [PATCH] Implement cluster wide topology notifications and let routing use it - Made TopologyManager to generate ClusterWide updates along with the local ones. - Implemented a mechanism in topology manager to batch the topology updates being synched via clustering services. This would save on unnecessary churn in recalculations under massive topology updates. - Modified routing Dijkstra to be an ITopologyManagerClusterWideAware client rather than on of ITopologyManagerAware. - Modified Dijkstra implementation to generate routing updates notifications only on the coordinator, in fact given Dijkstra will have the same view on all the controllers node in the cluster, then it's pointless to have all the clients of IRoutingAware to recalculate, they would lead to the same result. - Dijkstra edgeUpdate logic was considering the CHANGED topology event as DELETE, that is wrong and this patch fix it. - Added gogo shell CLI to get a route from the DijkstraImplementation.java - Enhanced "TopoEdgeUpdate" class to has the isLocal flag to distinguish between the local updates and the remotes one. Also remove non-key field from equal and hashCode calculation. - Remove unnecessary CONFIGSAVEEVENT in TopologyManagerImpl because now the configuration service provides a cluster wide trigger. Change-Id: Ia74d9d1ec0731e1f5815a69edc25bbb5b4c1f531 Signed-off-by: Giovanni Meo --- .../test/internal/SimpleClient.java | 5 +- .../routing/dijkstra_implementation/pom.xml | 3 + .../internal/Activator.java | 51 ++-- .../internal/DijkstraImplementation.java | 200 ++++++++------- .../internal/DijkstraImplementationCLI.java | 65 +++++ .../sal/topology/TopoEdgeUpdate.java | 54 ++-- .../ITopologyManagerClusterWideAware.java | 11 + .../topologymanager/internal/Activator.java | 18 +- .../internal/TopologyManagerImpl.java | 236 +++++++++++++----- 9 files changed, 447 insertions(+), 196 deletions(-) create mode 100644 opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementationCLI.java create mode 100644 opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerClusterWideAware.java diff --git a/opendaylight/clustering/test/src/main/java/org/opendaylight/controller/clustering/test/internal/SimpleClient.java b/opendaylight/clustering/test/src/main/java/org/opendaylight/controller/clustering/test/internal/SimpleClient.java index 0a0a9d8828..f681c35c8c 100644 --- a/opendaylight/clustering/test/src/main/java/org/opendaylight/controller/clustering/test/internal/SimpleClient.java +++ b/opendaylight/clustering/test/src/main/java/org/opendaylight/controller/clustering/test/internal/SimpleClient.java @@ -151,7 +151,7 @@ public class SimpleClient implements CommandProvider { .getILoggerFactory(); if (lc != null) { for (ch.qos.logback.classic.Logger l : lc.getLoggerList()) { - if (loggerName == null || l.getName().startsWith(loggerName)) { + if ((loggerName == null) || l.getName().startsWith(loggerName)) { ci.println(retrieveLogLevel(l)); } } @@ -382,6 +382,7 @@ public class SimpleClient implements CommandProvider { ci.println("Cache not supplied"); return; } + int count = 0; c = (ConcurrentMap) this.icluster.getCache(containerName, cacheName); if (c != null) { for (Map.Entry e : c.entrySet()) { @@ -394,7 +395,9 @@ public class SimpleClient implements CommandProvider { ci.println("Element " + entry.getKey() + "(hashCode=" + entry.getKey().hashCode() + ") has value = (" + res + ")"); + count++; } + ci.println("Dumped " + count + " records"); } else { ci.println("Cache " + cacheName + " on container " + containerName + " not existant!"); diff --git a/opendaylight/routing/dijkstra_implementation/pom.xml b/opendaylight/routing/dijkstra_implementation/pom.xml index 0e45e06cdf..c0f7afa65c 100644 --- a/opendaylight/routing/dijkstra_implementation/pom.xml +++ b/opendaylight/routing/dijkstra_implementation/pom.xml @@ -28,6 +28,7 @@ org.opendaylight.controller.sal.topology, org.opendaylight.controller.sal.utils, org.opendaylight.controller.sal.reader, + org.opendaylight.controller.clustering.services, org.apache.commons.collections15, org.opendaylight.controller.switchmanager, org.opendaylight.controller.topologymanager, @@ -35,6 +36,8 @@ edu.uci.ics.jung.algorithms.shortestpath, edu.uci.ics.jung.graph.util, org.apache.felix.dm, + org.osgi.framework, + org.apache.felix.service.command, org.junit;resolution:=optional diff --git a/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/Activator.java b/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/Activator.java index 3cd696854c..e25c34c75f 100644 --- a/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/Activator.java +++ b/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/Activator.java @@ -10,18 +10,21 @@ package org.opendaylight.controller.routing.dijkstra_implementation.internal; import java.util.Dictionary; +import java.util.HashSet; import java.util.Hashtable; +import java.util.Set; + import org.apache.felix.dm.Component; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase; import org.opendaylight.controller.sal.routing.IListenRoutingUpdates; import org.opendaylight.controller.sal.routing.IRouting; import org.opendaylight.controller.switchmanager.ISwitchManager; -import org.opendaylight.controller.sal.reader.IReadService; import org.opendaylight.controller.topologymanager.ITopologyManager; -import org.opendaylight.controller.topologymanager.ITopologyManagerAware; +import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; +import org.opendaylight.controller.clustering.services.IClusterContainerServices; public class Activator extends ComponentActivatorAbstractBase { protected static final Logger logger = LoggerFactory @@ -33,6 +36,7 @@ public class Activator extends ComponentActivatorAbstractBase { * ComponentActivatorAbstractBase. * */ + @Override public void init() { } @@ -41,6 +45,7 @@ public class Activator extends ComponentActivatorAbstractBase { * cleanup done by ComponentActivatorAbstractBase * */ + @Override public void destroy() { } @@ -54,6 +59,7 @@ public class Activator extends ComponentActivatorAbstractBase { * instantiated in order to get an fully working implementation * Object */ + @Override public Object[] getImplementations() { Object[] res = { DijkstraImplementation.class }; return res; @@ -72,32 +78,39 @@ public class Activator extends ComponentActivatorAbstractBase { * also optional per-container different behavior if needed, usually * should not be the case though. */ - public void configureInstance(Component c, Object imp, String containerName) { + @Override + public void configureInstance(final Component c, final Object imp, final String containerName) { if (imp.equals(DijkstraImplementation.class)) { // export the service - Dictionary props = new Hashtable(); + final Dictionary props = new Hashtable(); props.put("topoListenerName", "routing.Dijkstra"); - c.setInterface(new String[] { ITopologyManagerAware.class.getName(), - IRouting.class.getName() }, props); + + c.setInterface(new String[] { ITopologyManagerClusterWideAware.class.getName(), IRouting.class.getName() }, + props); // Now lets add a service dependency to make sure the // provider of service exists - c.add(createContainerServiceDependency(containerName).setService( - IListenRoutingUpdates.class).setCallbacks( - "setListenRoutingUpdates", "unsetListenRoutingUpdates") + c.add(createContainerServiceDependency(containerName).setService(IListenRoutingUpdates.class) + .setCallbacks("setListenRoutingUpdates", "unsetListenRoutingUpdates") .setRequired(false)); - c.add(createContainerServiceDependency(containerName).setService( - ISwitchManager.class).setCallbacks("setSwitchManager", - "unsetSwitchManager").setRequired(true)); + c.add(createContainerServiceDependency(containerName).setService(ISwitchManager.class) + .setCallbacks("setSwitchManager", "unsetSwitchManager") + .setRequired(true)); - c.add(createContainerServiceDependency(containerName).setService( - ITopologyManager.class).setCallbacks("setTopologyManager", - "unsetTopologyManager").setRequired(true)); + c.add(createContainerServiceDependency(containerName).setService(ITopologyManager.class) + .setCallbacks("setTopologyManager", "unsetTopologyManager") + .setRequired(true)); - c.add(createContainerServiceDependency(containerName).setService( - IReadService.class).setCallbacks("setReadService", - "unsetReadService").setRequired(true)); + c.add(createContainerServiceDependency(containerName).setService(IClusterContainerServices.class) + .setCallbacks("setClusterContainerService", "unsetClusterContainerService") + .setRequired(true)); } } + + @Override + protected Object[] getGlobalImplementations() { + final Object[] res = { DijkstraImplementationCLI.class }; + return res; + } } diff --git a/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementation.java b/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementation.java index 3a0faa91d4..a8ef381528 100644 --- a/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementation.java +++ b/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementation.java @@ -16,6 +16,7 @@ */ package org.opendaylight.controller.routing.dijkstra_implementation.internal; +import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.sal.core.Bandwidth; import org.opendaylight.controller.sal.core.ConstructionException; import org.opendaylight.controller.sal.core.Edge; @@ -24,19 +25,18 @@ import org.opendaylight.controller.sal.core.NodeConnector; import org.opendaylight.controller.sal.core.Path; import org.opendaylight.controller.sal.core.Property; import org.opendaylight.controller.sal.core.UpdateType; -import org.opendaylight.controller.sal.reader.IReadService; import org.opendaylight.controller.sal.routing.IListenRoutingUpdates; - import org.opendaylight.controller.sal.routing.IRouting; import org.opendaylight.controller.sal.topology.TopoEdgeUpdate; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.topologymanager.ITopologyManager; -import org.opendaylight.controller.topologymanager.ITopologyManagerAware; +import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware; import edu.uci.ics.jung.algorithms.shortestpath.DijkstraShortestPath; import edu.uci.ics.jung.graph.Graph; import edu.uci.ics.jung.graph.SparseMultigraph; import edu.uci.ics.jung.graph.util.EdgeType; + import java.lang.Exception; import java.lang.IllegalArgumentException; import java.util.HashSet; @@ -47,23 +47,24 @@ import java.util.Set; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.collections15.Transformer; -public class DijkstraImplementation implements IRouting, ITopologyManagerAware { - private static Logger log = LoggerFactory - .getLogger(DijkstraImplementation.class); +@SuppressWarnings("rawtypes") +public class DijkstraImplementation implements IRouting, ITopologyManagerClusterWideAware { + private static Logger log = LoggerFactory.getLogger(DijkstraImplementation.class); private ConcurrentMap> topologyBWAware; private ConcurrentMap> sptBWAware; DijkstraShortestPath mtp; // Max Throughput Path private Set routingAware; private ISwitchManager switchManager; private ITopologyManager topologyManager; - private IReadService readService; private static final long DEFAULT_LINK_SPEED = Bandwidth.BW1Gbps; + private IClusterContainerServices clusterContainerService; - public void setListenRoutingUpdates(IListenRoutingUpdates i) { + public void setListenRoutingUpdates(final IListenRoutingUpdates i) { if (this.routingAware == null) { this.routingAware = new HashSet(); } @@ -73,7 +74,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { } } - public void unsetListenRoutingUpdates(IListenRoutingUpdates i) { + public void unsetListenRoutingUpdates(final IListenRoutingUpdates i) { if (this.routingAware == null) { return; } @@ -95,6 +96,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { Transformer mtTransformer = null; if (EdgeWeightMap == null) { mtTransformer = new Transformer() { + @Override public Double transform(Edge e) { if (switchManager == null) { log.error("switchManager is null"); @@ -106,43 +108,39 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { log.error("srcNC:{} or dstNC:{} is null", srcNC, dstNC); return (double) -1; } - Bandwidth bwSrc = (Bandwidth) switchManager - .getNodeConnectorProp(srcNC, - Bandwidth.BandwidthPropName); - Bandwidth bwDst = (Bandwidth) switchManager - .getNodeConnectorProp(dstNC, - Bandwidth.BandwidthPropName); + Bandwidth bwSrc = (Bandwidth) switchManager.getNodeConnectorProp(srcNC, + Bandwidth.BandwidthPropName); + Bandwidth bwDst = (Bandwidth) switchManager.getNodeConnectorProp(dstNC, + Bandwidth.BandwidthPropName); long srcLinkSpeed = 0, dstLinkSpeed = 0; - if ((bwSrc == null) - || ((srcLinkSpeed = bwSrc.getValue()) == 0)) { - log.debug( - "srcNC: {} - Setting srcLinkSpeed to Default!", - srcNC); + if ((bwSrc == null) || ((srcLinkSpeed = bwSrc.getValue()) == 0)) { + log.debug("srcNC: {} - Setting srcLinkSpeed to Default!", srcNC); srcLinkSpeed = DEFAULT_LINK_SPEED; } - if ((bwDst == null) - || ((dstLinkSpeed = bwDst.getValue()) == 0)) { - log.debug( - "dstNC: {} - Setting dstLinkSpeed to Default!", - dstNC); + if ((bwDst == null) || ((dstLinkSpeed = bwDst.getValue()) == 0)) { + log.debug("dstNC: {} - Setting dstLinkSpeed to Default!", dstNC); dstLinkSpeed = DEFAULT_LINK_SPEED; } - long avlSrcThruPut = srcLinkSpeed - - readService.getTransmitRate(srcNC); - long avlDstThruPut = dstLinkSpeed - - readService.getTransmitRate(dstNC); - - // Use lower of the 2 available thruput as the available - // thruput - long avlThruPut = avlSrcThruPut < avlDstThruPut ? avlSrcThruPut - : avlDstThruPut; + // TODO: revisit the logic below with the real use case in + // mind + // For now we assume the throughput to be the speed of the + // link itself + // this kind of logic require information that should be + // polled by statistic manager and are not yet available, + // also this service at the moment is not used, so to be + // revisited later on + long avlSrcThruPut = srcLinkSpeed; + long avlDstThruPut = dstLinkSpeed; + + // Use lower of the 2 available throughput as the available + // throughput + long avlThruPut = avlSrcThruPut < avlDstThruPut ? avlSrcThruPut : avlDstThruPut; if (avlThruPut <= 0) { - log.debug("Edge {}: Available Throughput {} <= 0!", e, - avlThruPut); + log.debug("Edge {}: Available Throughput {} <= 0!", e, avlThruPut); return (double) -1; } return (double) (Bandwidth.BW1Pbps / avlThruPut); @@ -150,6 +148,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { }; } else { mtTransformer = new Transformer() { + @Override public Number transform(Edge e) { return EdgeWeightMap.get(e); } @@ -166,8 +165,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { } @Override - public Path getRoute(Node src, Node dst) { - if (src == null || dst == null) { + public Path getRoute(final Node src, final Node dst) { + if ((src == null) || (dst == null)) { return null; } return getRoute(src, dst, (short) 0); @@ -198,10 +197,11 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { } @Override - public synchronized Path getRoute(Node src, Node dst, Short Bw) { + public synchronized Path getRoute(final Node src, final Node dst, final Short Bw) { DijkstraShortestPath spt = this.sptBWAware.get(Bw); - if (spt == null) + if (spt == null) { return null; + } List path; try { path = spt.getPath(src, dst); @@ -238,8 +238,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { } } - @SuppressWarnings({ "rawtypes", "unchecked" }) - private synchronized boolean updateTopo(Edge edge, Short bw, boolean added) { + @SuppressWarnings({ "unchecked" }) + private synchronized boolean updateTopo(Edge edge, Short bw, UpdateType type) { Graph topo = this.topologyBWAware.get(bw); DijkstraShortestPath spt = this.sptBWAware.get(bw); boolean edgePresentInGraph = false; @@ -262,7 +262,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { this.sptBWAware.put(bw, spt); } - if (added) { + switch (type) { + case ADDED: // Make sure the vertex are there before adding the edge topo.addVertex(src.getNode()); topo.addVertex(dst.getNode()); @@ -270,37 +271,39 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { edgePresentInGraph = topo.containsEdge(edge); if (edgePresentInGraph == false) { try { - topo.addEdge(new Edge(src, dst), src.getNode(), - dst.getNode(), EdgeType.DIRECTED); - } catch (ConstructionException e) { + topo.addEdge(new Edge(src, dst), src.getNode(), dst.getNode(), EdgeType.DIRECTED); + } catch (final ConstructionException e) { log.error("", e); return edgePresentInGraph; } } - } else { + case CHANGED: + // Mainly raised only on properties update, so not really useful + // in this case + break; + case REMOVED: // Remove the edge try { topo.removeEdge(new Edge(src, dst)); - } catch (ConstructionException e) { + } catch (final ConstructionException e) { log.error("", e); return edgePresentInGraph; } // If the src and dst vertex don't have incoming or // outgoing links we can get ride of them - if (topo.containsVertex(src.getNode()) - && topo.inDegree(src.getNode()) == 0 - && topo.outDegree(src.getNode()) == 0) { + if (topo.containsVertex(src.getNode()) && (topo.inDegree(src.getNode()) == 0) + && (topo.outDegree(src.getNode()) == 0)) { log.debug("Removing vertex {}", src); topo.removeVertex(src.getNode()); } - if (topo.containsVertex(dst.getNode()) - && topo.inDegree(dst.getNode()) == 0 - && topo.outDegree(dst.getNode()) == 0) { + if (topo.containsVertex(dst.getNode()) && (topo.inDegree(dst.getNode()) == 0) + && (topo.outDegree(dst.getNode()) == 0)) { log.debug("Removing vertex {}", dst); topo.removeVertex(dst.getNode()); } + break; } spt.reset(); if (bw.equals(baseBW)) { @@ -312,11 +315,13 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { return edgePresentInGraph; } - private boolean edgeUpdate(Edge e, UpdateType type, Set props) { + private boolean edgeUpdate(Edge e, UpdateType type, Set props, boolean local) { String srcType = null; String dstType = null; - if (e == null || type == null) { + log.trace("Got an edgeUpdate: {} props: {} update type: {} local: {}", new Object[] { e, props, type, local }); + + if ((e == null) || (type == null)) { log.error("Edge or Update type are null!"); return false; } else { @@ -336,21 +341,17 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { Bandwidth bw = new Bandwidth(0); boolean newEdge = false; - if (props != null) + if (props != null) { props.remove(bw); - - if (log.isDebugEnabled()) { - log.debug("edgeUpdate: {} bw: {}", e, bw.getValue()); } Short baseBW = Short.valueOf((short) 0); - boolean add = (type == UpdateType.ADDED) ? true : false; // Update base topo - newEdge = !updateTopo(e, baseBW, add); + newEdge = !updateTopo(e, baseBW, type); if (newEdge == true) { if (bw.getValue() != baseBW) { // Update BW topo - updateTopo(e, (short) bw.getValue(), add); + updateTopo(e, (short) bw.getValue(), type); } } return newEdge; @@ -358,16 +359,30 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { @Override public void edgeUpdate(List topoedgeupdateList) { + log.trace("Start of a Bulk EdgeUpdate with " + topoedgeupdateList.size() + " elements"); boolean callListeners = false; for (int i = 0; i < topoedgeupdateList.size(); i++) { Edge e = topoedgeupdateList.get(i).getEdge(); - Set p = topoedgeupdateList.get(i).getProperty(); - UpdateType type = topoedgeupdateList.get(i).getUpdateType(); - if ((edgeUpdate(e, type, p)) && (!callListeners)) { + Set p = topoedgeupdateList.get(i) + .getProperty(); + UpdateType type = topoedgeupdateList.get(i) + .getUpdateType(); + boolean isLocal = topoedgeupdateList.get(i) + .isLocal(); + if ((edgeUpdate(e, type, p, isLocal)) && (!callListeners)) { callListeners = true; } } - if ((callListeners) && (this.routingAware != null)) { + + // The routing listeners should only be called on the coordinator, to + // avoid multiple controller cluster nodes to actually do the + // recalculation when only one need to react + boolean amICoordinator = true; + if (this.clusterContainerService != null) { + amICoordinator = this.clusterContainerService.amICoordinator(); + } + if ((callListeners) && (this.routingAware != null) && amICoordinator) { + log.trace("Calling the routing listeners"); for (IListenRoutingUpdates ra : this.routingAware) { try { ra.recalculateDone(); @@ -376,6 +391,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { } } } + log.trace("End of a Bulk EdgeUpdate"); } /** @@ -386,8 +402,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { @SuppressWarnings({ "unchecked", "rawtypes" }) public void init() { log.debug("Routing init() is called"); - this.topologyBWAware = (ConcurrentMap>) new ConcurrentHashMap(); - this.sptBWAware = (ConcurrentMap>) new ConcurrentHashMap(); + this.topologyBWAware = new ConcurrentHashMap>(); + this.sptBWAware = new ConcurrentHashMap>(); // Now create the default topology, which doesn't consider the // BW, also create the corresponding Dijkstra calculation Graph g = new SparseMultigraph(); @@ -443,18 +459,6 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { log.debug("Routing stop() is called"); } - @Override - public void edgeOverUtilized(Edge edge) { - // TODO Auto-generated method stub - - } - - @Override - public void edgeUtilBackToNormal(Edge edge) { - // TODO Auto-generated method stub - - } - public void setSwitchManager(ISwitchManager switchManager) { this.switchManager = switchManager; } @@ -465,16 +469,6 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { } } - public void setReadService(IReadService readService) { - this.readService = readService; - } - - public void unsetReadService(IReadService readService) { - if (this.readService == readService) { - this.readService = null; - } - } - public void setTopologyManager(ITopologyManager tm) { this.topologyManager = tm; } @@ -484,4 +478,28 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware { this.topologyManager = null; } } + + void setClusterContainerService(IClusterContainerServices s) { + log.debug("Cluster Service set"); + this.clusterContainerService = s; + } + + void unsetClusterContainerService(IClusterContainerServices s) { + if (this.clusterContainerService == s) { + log.debug("Cluster Service removed!"); + this.clusterContainerService = null; + } + } + + @Override + public void edgeOverUtilized(Edge edge) { + // TODO Auto-generated method stub + + } + + @Override + public void edgeUtilBackToNormal(Edge edge) { + // TODO Auto-generated method stub + + } } diff --git a/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementationCLI.java b/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementationCLI.java new file mode 100644 index 0000000000..a6645cce14 --- /dev/null +++ b/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementationCLI.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.routing.dijkstra_implementation.internal; + +import java.util.Dictionary; +import java.util.Hashtable; + +import org.apache.felix.service.command.Descriptor; +import org.opendaylight.controller.sal.core.Node; +import org.opendaylight.controller.sal.core.Path; +import org.opendaylight.controller.sal.routing.IRouting; +import org.opendaylight.controller.sal.utils.ServiceHelper; +import org.osgi.framework.ServiceRegistration; + +public class DijkstraImplementationCLI { + @SuppressWarnings("rawtypes") + private ServiceRegistration sr = null; + + public void init() { + } + + public void destroy() { + } + + public void start() { + final Dictionary props = new Hashtable(); + props.put("osgi.command.scope", "odpcontroller"); + props.put("osgi.command.function", new String[] { "getRoute" }); + this.sr = ServiceHelper.registerGlobalServiceWReg(DijkstraImplementationCLI.class, this, props); + } + + public void stop() { + if (this.sr != null) { + this.sr.unregister(); + this.sr = null; + } + } + + @Descriptor("Retrieves a Route between two Nodes in the discovered Topology DB") + public void getRoute( + @Descriptor("Container on the context of which the routing service need to be looked up") String container, + @Descriptor("String representation of the Source Node, this need to be consumable from Node.fromString()") String srcNode, + @Descriptor("String representation of the Destination Node") String dstNode) { + final IRouting r = (IRouting) ServiceHelper.getInstance(IRouting.class, container, this); + + if (r == null) { + System.out.println("Cannot find the routing instance on container:" + container); + return; + } + + final Node src = Node.fromString(srcNode); + final Node dst = Node.fromString(dstNode); + final Path p = r.getRoute(src, dst); + if (p != null) { + System.out.println("Route between srcNode:" + src + " and dstNode:" + dst + " = " + p); + } else { + System.out.println("There is no route between srcNode:" + src + " and dstNode:" + dst); + } + } +} diff --git a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/topology/TopoEdgeUpdate.java b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/topology/TopoEdgeUpdate.java index 58bf350c40..0208cc7cda 100644 --- a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/topology/TopoEdgeUpdate.java +++ b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/topology/TopoEdgeUpdate.java @@ -17,16 +17,29 @@ import org.opendaylight.controller.sal.core.UpdateType; /** * The class represents an Edge, the Edge's Property Set and its UpdateType. */ - public class TopoEdgeUpdate { private Edge edge; private Set props; private UpdateType type; + private boolean isLocal; + /** + * Constructor for a topology element update. A TopologyElementUpdate is an + * object that summarize what has happened on an Edge and if the update is + * generated locally to this controller or no + * + * @param e + * Edge being updated + * @param p + * Set of Properties attached to the edge + * @param t + * Type of update + */ public TopoEdgeUpdate(Edge e, Set p, UpdateType t) { edge = e; props = p; type = t; + setLocal(true); } public Edge getEdge() { @@ -45,39 +58,46 @@ public class TopoEdgeUpdate { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((edge == null) ? 0 : edge.hashCode()); - result = prime * result + ((props == null) ? 0 : props.hashCode()); - result = prime * result + ((type == null) ? 0 : type.hashCode()); + result = (prime * result) + ((edge == null) ? 0 : edge.hashCode()); + result = (prime * result) + ((type == null) ? 0 : type.hashCode()); return result; } @Override public String toString() { - return "TopoEdgeUpdate [edge=" + edge + ", props=" + props + ", type=" - + type + "]"; + return "TopoEdgeUpdate [edge=" + edge + ", props=" + props + ", type=" + type + ", isLocal=" + isLocal + "]"; } @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } TopoEdgeUpdate other = (TopoEdgeUpdate) obj; if (edge == null) { - if (other.edge != null) - return false; - } else if (!edge.equals(other.edge)) - return false; - if (props == null) { - if (other.props != null) + if (other.edge != null) { return false; - } else if (!props.equals(other.props)) + } + } else if (!edge.equals(other.edge)) { return false; - if (type != other.type) + } + if (type != other.type) { return false; + } return true; } + + public boolean isLocal() { + return isLocal; + } + + public void setLocal(boolean isLocal) { + this.isLocal = isLocal; + } } diff --git a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerClusterWideAware.java b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerClusterWideAware.java new file mode 100644 index 0000000000..aa47b5fa95 --- /dev/null +++ b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerClusterWideAware.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.topologymanager; + +public interface ITopologyManagerClusterWideAware extends ITopologyManagerAware { +} diff --git a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java index 0da1a2ee39..d8ff141ede 100644 --- a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java +++ b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java @@ -15,6 +15,7 @@ import java.util.Hashtable; import java.util.Set; import org.apache.felix.dm.Component; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.configuration.IConfigurationContainerAware; import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase; @@ -22,6 +23,7 @@ import org.opendaylight.controller.sal.topology.IListenTopoUpdates; import org.opendaylight.controller.sal.topology.ITopologyService; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; +import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +37,7 @@ public class Activator extends ComponentActivatorAbstractBase { * ComponentActivatorAbstractBase. * */ + @Override public void init() { } @@ -44,6 +47,7 @@ public class Activator extends ComponentActivatorAbstractBase { * cleanup done by ComponentActivatorAbstractBase * */ + @Override public void destroy() { } @@ -57,6 +61,7 @@ public class Activator extends ComponentActivatorAbstractBase { * instantiated in order to get an fully working implementation * Object */ + @Override public Object[] getImplementations() { Object[] res = { TopologyManagerImpl.class }; return res; @@ -75,17 +80,19 @@ public class Activator extends ComponentActivatorAbstractBase { * also optional per-container different behavior if needed, usually * should not be the case though. */ + @Override public void configureInstance(Component c, Object imp, String containerName) { if (imp.equals(TopologyManagerImpl.class)) { // export the service needed to listen to topology updates Dictionary> props = new Hashtable>(); Set propSet = new HashSet(); - propSet.add("topologymanager.configSaveEvent"); + propSet.add(TopologyManagerImpl.TOPOEDGESDB); props.put("cachenames", propSet); c.setInterface(new String[] { IListenTopoUpdates.class.getName(), ITopologyManager.class.getName(), - IConfigurationContainerAware.class.getName() }, props); + IConfigurationContainerAware.class.getName(), + ICacheUpdateAware.class.getName() }, props); c.add(createContainerServiceDependency(containerName).setService( ITopologyService.class).setCallbacks("setTopoService", @@ -98,6 +105,13 @@ public class Activator extends ComponentActivatorAbstractBase { "setTopologyManagerAware", "unsetTopologyManagerAware") .setRequired(false)); + // These are all the listeners for a topology manager for the + // cluster wide events + // updates, there could be many or none + c.add(createContainerServiceDependency(containerName).setService(ITopologyManagerClusterWideAware.class) + .setCallbacks("setTopologyManagerClusterWideAware", "unsetTopologyManagerClusterWideAware") + .setRequired(false)); + c.add(createContainerServiceDependency(containerName).setService( IClusterContainerServices.class).setCallbacks( "setClusterContainerService", diff --git a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java index a043cbe925..4972d3b5b5 100644 --- a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java +++ b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java @@ -21,9 +21,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.felix.dm.Component; @@ -31,6 +33,7 @@ import org.eclipse.osgi.framework.console.CommandInterpreter; import org.eclipse.osgi.framework.console.CommandProvider; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.configuration.IConfigurationContainerAware; @@ -52,6 +55,7 @@ import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; +import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware; import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; @@ -63,9 +67,17 @@ import org.slf4j.LoggerFactory; * network topology. It provides service for applications to interact with * topology database and notifies all the listeners of topology changes. */ -public class TopologyManagerImpl implements ITopologyManager, -IConfigurationContainerAware, IListenTopoUpdates, IObjectReader, -CommandProvider { +public class TopologyManagerImpl implements + ICacheUpdateAware, + ITopologyManager, + IConfigurationContainerAware, + IListenTopoUpdates, + IObjectReader, + CommandProvider { + static final String TOPOEDGESDB = "topologymanager.edgesDB"; + static final String TOPOHOSTSDB = "topologymanager.hostsDB"; + static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB"; + static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB"; private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class); private static final String SAVE = "Save"; private ITopologyService topoService; @@ -79,13 +91,16 @@ CommandProvider { // DB of all the NodeConnectors with an Host attached to it private ConcurrentMap>> hostsDB; // Topology Manager Aware listeners - private Set topologyManagerAware = - new CopyOnWriteArraySet();; - + private Set topologyManagerAware = new CopyOnWriteArraySet(); + // Topology Manager Aware listeners - for clusterwide updates + private Set topologyManagerClusterWideAware = + new CopyOnWriteArraySet(); private static String ROOT = GlobalConstants.STARTUPHOME.toString(); private String userLinksFileName; private ConcurrentMap userLinksDB; - private ConcurrentMap configSaveEvent; + private BlockingQueue notifyQ = new LinkedBlockingQueue(); + private volatile Boolean shuttingDown = false; + private Thread notifyThread; void nonClusterObjectCreate() { @@ -93,7 +108,6 @@ CommandProvider { hostsDB = new ConcurrentHashMap>>(); nodeConnectorsDB = new ConcurrentHashMap>(); userLinksDB = new ConcurrentHashMap(); - configSaveEvent = new ConcurrentHashMap(); } void setTopologyManagerAware(ITopologyManagerAware s) { @@ -110,6 +124,20 @@ CommandProvider { } } + void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) { + if (this.topologyManagerClusterWideAware != null) { + log.debug("Adding ITopologyManagerClusterWideAware: {}", s); + this.topologyManagerClusterWideAware.add(s); + } + } + + void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) { + if (this.topologyManagerClusterWideAware != null) { + log.debug("Removing ITopologyManagerClusterWideAware: {}", s); + this.topologyManagerClusterWideAware.remove(s); + } + } + void setTopoService(ITopologyService s) { log.debug("Adding ITopologyService: {}", s); this.topoService = s; @@ -140,10 +168,8 @@ CommandProvider { * */ void init(Component c) { - allocateCaches(); retrieveCaches(); - String containerName = null; Dictionary props = c.getServiceProperties(); if (props != null) { @@ -156,61 +182,52 @@ CommandProvider { userLinksFileName = ROOT + "userTopology_" + containerName + ".conf"; registerWithOSGIConsole(); loadConfiguration(); + // Restore the shuttingDown status on init of the component + shuttingDown = false; + notifyThread = new Thread(new TopologyNotify(notifyQ)); } @SuppressWarnings({ "unchecked", "deprecation" }) - private void allocateCaches(){ - if (this.clusterContainerService == null) { - nonClusterObjectCreate(); - log.error("Cluster Services unavailable, allocated non-cluster caches!"); - return; - } - + private void allocateCaches() { try { - this.edgesDB = (ConcurrentMap>) this.clusterContainerService.createCache( - "topologymanager.edgesDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.edgesDB = + (ConcurrentMap>) this.clusterContainerService.createCache(TOPOEDGESDB, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.debug("topologymanager.edgesDB Cache already exists - destroy and recreate if needed"); + log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.edgesDB Cache configuration invalid - check cache mode"); + log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode"); } try { - this.hostsDB = (ConcurrentMap>>) this.clusterContainerService - .createCache("topologymanager.hostsDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.hostsDB = + (ConcurrentMap>>) this.clusterContainerService.createCache( + TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.debug("topologymanager.hostsDB Cache already exists - destroy and recreate if needed"); + log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.hostsDB Cache configuration invalid - check cache mode"); + log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode"); } try { - this.nodeConnectorsDB = (ConcurrentMap>) this.clusterContainerService - .createCache("topologymanager.nodeConnectorDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.nodeConnectorsDB = + (ConcurrentMap>) this.clusterContainerService.createCache( + TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.debug("topologymanager.nodeConnectorDB Cache already exists - destroy and recreate if needed"); + log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.nodeConnectorDB Cache configuration invalid - check cache mode"); + log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode"); } try { - this.userLinksDB = (ConcurrentMap) this.clusterContainerService - .createCache("topologymanager.userLinksDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.userLinksDB = + (ConcurrentMap) this.clusterContainerService.createCache( + TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.debug("topologymanager.userLinksDB Cache already exists - destroy and recreate if needed"); + log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.userLinksDB Cache configuration invalid - check cache mode"); + log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode"); } - - try { - this.configSaveEvent = (ConcurrentMap) this.clusterContainerService - .createCache("topologymanager.configSaveEvent", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); - } catch (CacheExistException cee) { - log.debug("topologymanager.configSaveEvent Cache already exists - destroy and recreate if needed"); - } catch (CacheConfigException cce) { - log.error("topologymanager.configSaveEvent Cache configuration invalid - check cache mode"); - } - } @SuppressWarnings({ "unchecked", "deprecation" }) @@ -220,36 +237,28 @@ CommandProvider { return; } - this.edgesDB = (ConcurrentMap>) this.clusterContainerService - .getCache("topologymanager.edgesDB"); + this.edgesDB = (ConcurrentMap>) this.clusterContainerService.getCache(TOPOEDGESDB); if (edgesDB == null) { - log.error("Failed to get cache for topologymanager.edgesDB"); + log.error("Failed to get cache for " + TOPOEDGESDB); } - this.hostsDB = (ConcurrentMap>>) this.clusterContainerService - .getCache("topologymanager.hostsDB"); + this.hostsDB = + (ConcurrentMap>>) this.clusterContainerService.getCache(TOPOHOSTSDB); if (hostsDB == null) { - log.error("Failed to get cache for topologymanager.hostsDB"); + log.error("Failed to get cache for " + TOPOHOSTSDB); } - this.nodeConnectorsDB = (ConcurrentMap>) this.clusterContainerService - .getCache("topologymanager.nodeConnectorDB"); + this.nodeConnectorsDB = + (ConcurrentMap>) this.clusterContainerService.getCache(TOPONODECONNECTORDB); if (nodeConnectorsDB == null) { - log.error("Failed to get cache for topologymanager.nodeConnectorDB"); + log.error("Failed to get cache for " + TOPONODECONNECTORDB); } - this.userLinksDB = (ConcurrentMap) this.clusterContainerService - .getCache("topologymanager.userLinksDB"); + this.userLinksDB = + (ConcurrentMap) this.clusterContainerService.getCache(TOPOUSERLINKSDB); if (userLinksDB == null) { - log.error("Failed to get cache for topologymanager.userLinksDB"); + log.error("Failed to get cache for " + TOPOUSERLINKSDB); } - - this.configSaveEvent = (ConcurrentMap) this.clusterContainerService - .getCache("topologymanager.configSaveEvent"); - if (configSaveEvent == null) { - log.error("Failed to get cache for topologymanager.configSaveEvent"); - } - } /** @@ -258,12 +267,19 @@ CommandProvider { * */ void started() { + // Start the batcher thread for the cluster wide topology updates + notifyThread.start(); // SollicitRefresh MUST be called here else if called at init // time it may sollicit refresh too soon. log.debug("Sollicit topology refresh"); topoService.sollicitRefresh(); } + void stop() { + shuttingDown = true; + notifyThread.interrupt(); + } + /** * Function called by the dependency manager when at least one dependency * become unsatisfied or when the component is shutting down because for @@ -271,6 +287,8 @@ CommandProvider { * */ void destroy() { + notifyQ.clear(); + notifyThread = null; } @SuppressWarnings("unchecked") @@ -288,8 +306,6 @@ CommandProvider { @Override public Status saveConfig() { - // Publish the save config event to the cluster - configSaveEvent.put(new Date().getTime(), SAVE ); return saveConfigInternal(); } @@ -436,7 +452,7 @@ CommandProvider { @Override public Host getHostAttachedToNodeConnector(NodeConnector port) { ImmutablePair> host; - if (this.hostsDB == null || (host = this.hostsDB.get(port)) == null) { + if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) { return null; } return host.getLeft(); @@ -674,7 +690,7 @@ CommandProvider { TopologyUserLinkConfig link = userLinksDB.remove(linkName); Edge linkTuple; - if (link != null && (linkTuple = getLinkTuple(link)) != null) { + if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) { if (! isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.REMOVED, null); } @@ -799,4 +815,92 @@ CommandProvider { log.warn("Link Utilization back to normal: {}", edge); } + private void edgeUpdateClusterWide(Edge e, UpdateType type, Set props, boolean isLocal) { + TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type); + upd.setLocal(isLocal); + notifyQ.add(upd); + } + + @Override + public void entryCreated(final Object key, final String cacheName, final boolean originLocal) { + if (cacheName.equals(TOPOEDGESDB)) { + // This is the case of an Edge being added to the topology DB + final Edge e = (Edge) key; + log.trace("Edge {} CREATED isLocal:{}", e, originLocal); + edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal); + } + } + + @Override + public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) { + if (cacheName.equals(TOPOEDGESDB)) { + final Edge e = (Edge) key; + log.trace("Edge {} CHANGED isLocal:{}", e, originLocal); + final Set props = (Set) new_value; + edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal); + } + } + + @Override + public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) { + if (cacheName.equals(TOPOEDGESDB)) { + final Edge e = (Edge) key; + log.trace("Edge {} DELETED isLocal:{}", e, originLocal); + edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal); + } + } + + class TopologyNotify implements Runnable { + private final BlockingQueue notifyQ; + private TopoEdgeUpdate entry; + private List teuList = new ArrayList(); + private boolean notifyListeners; + + TopologyNotify(BlockingQueue notifyQ) { + this.notifyQ = notifyQ; + } + + @Override + public void run() { + while (true) { + try { + log.trace("New run of TopologyNotify"); + notifyListeners = false; + // First we block waiting for an element to get in + entry = notifyQ.take(); + // Then we drain the whole queue if elements are + // in it without getting into any blocking condition + for (; entry != null; entry = notifyQ.poll()) { + teuList.add(entry); + notifyListeners = true; + } + + // Notify listeners only if there were updates drained else + // give up + if (notifyListeners) { + log.trace("Notifier thread, notified a listener"); + // Now update the listeners + for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) { + try { + s.edgeUpdate(teuList); + } catch (Exception exc) { + log.error("Exception on edge update:", exc); + } + } + } + teuList.clear(); + + // Lets sleep for sometime to allow aggregation of event + Thread.sleep(100); + } catch (InterruptedException e1) { + log.warn("TopologyNotify interrupted {}", e1.getMessage()); + if (shuttingDown) { + return; + } + } catch (Exception e2) { + log.error("", e2); + } + } + } + } } -- 2.36.6