Implement cluster wide topology notifications and let routing use it 31/831/2
authorGiovanni Meo <gmeo@cisco.com>
Thu, 8 Aug 2013 17:43:38 +0000 (19:43 +0200)
committerGiovanni Meo <gmeo@cisco.com>
Fri, 9 Aug 2013 07:54:40 +0000 (09:54 +0200)
- Made TopologyManager to generate ClusterWide updates along with the
local ones.
- Implemented a mechanism in topology manager to batch the topology
updates being synched via clustering services. This would save on
unnecessary churn in recalculations under massive topology updates.
- Modified routing Dijkstra to be an ITopologyManagerClusterWideAware
client rather than on of ITopologyManagerAware.
- Modified Dijkstra implementation to generate routing updates
notifications only on the coordinator, in fact given Dijkstra will
have the same view on all the controllers node in the cluster, then
it's pointless to have all the clients of IRoutingAware to
recalculate, they would lead to the same result.
- Dijkstra edgeUpdate logic was considering the CHANGED topology event
as DELETE, that is wrong and this patch fix it.
- Added gogo shell CLI to get a route from the
DijkstraImplementation.java
- Enhanced "TopoEdgeUpdate" class to has the isLocal flag to
distinguish between the local updates and the remotes one. Also remove
non-key field from equal and hashCode calculation.
- Remove unnecessary CONFIGSAVEEVENT in TopologyManagerImpl because
now the configuration service provides a cluster wide trigger.

Change-Id: Ia74d9d1ec0731e1f5815a69edc25bbb5b4c1f531
Signed-off-by: Giovanni Meo <gmeo@cisco.com>
opendaylight/clustering/test/src/main/java/org/opendaylight/controller/clustering/test/internal/SimpleClient.java
opendaylight/routing/dijkstra_implementation/pom.xml
opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/Activator.java
opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementation.java
opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementationCLI.java [new file with mode: 0644]
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/topology/TopoEdgeUpdate.java
opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerClusterWideAware.java [new file with mode: 0644]
opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java
opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java

index 0a0a9d8828ea3ec6de32739d13e85f99e2dd08bf..f681c35c8c9ea376a3d0399269d798479a296689 100644 (file)
@@ -151,7 +151,7 @@ public class SimpleClient implements CommandProvider {
                 .getILoggerFactory();
         if (lc != null) {
             for (ch.qos.logback.classic.Logger l : lc.getLoggerList()) {
-                if (loggerName == null || l.getName().startsWith(loggerName)) {
+                if ((loggerName == null) || l.getName().startsWith(loggerName)) {
                     ci.println(retrieveLogLevel(l));
                 }
             }
@@ -382,6 +382,7 @@ public class SimpleClient implements CommandProvider {
             ci.println("Cache not supplied");
             return;
         }
+        int count = 0;
         c = (ConcurrentMap<Object, Object>) this.icluster.getCache(containerName, cacheName);
         if (c != null) {
             for (Map.Entry<Object, Object> e : c.entrySet()) {
@@ -394,7 +395,9 @@ public class SimpleClient implements CommandProvider {
                 ci.println("Element " + entry.getKey() + "(hashCode="
                         + entry.getKey().hashCode() + ") has value = (" + res
                         + ")");
+                count++;
             }
+            ci.println("Dumped " + count + " records");
         } else {
             ci.println("Cache " + cacheName + " on container " + containerName
                     + " not existant!");
index 0e45e06cdf234301f048c99799448c4aae385a57..c0f7afa65ce9125b29c1e8aeb2ce4e2793cb9059 100644 (file)
@@ -28,6 +28,7 @@
               org.opendaylight.controller.sal.topology,
               org.opendaylight.controller.sal.utils,
               org.opendaylight.controller.sal.reader,
+              org.opendaylight.controller.clustering.services,
               org.apache.commons.collections15,
               org.opendaylight.controller.switchmanager,
               org.opendaylight.controller.topologymanager,
@@ -35,6 +36,8 @@
               edu.uci.ics.jung.algorithms.shortestpath,
               edu.uci.ics.jung.graph.util,
               org.apache.felix.dm,
+              org.osgi.framework,
+              org.apache.felix.service.command,
               org.junit;resolution:=optional
             </Import-Package>
             <Bundle-Activator>
index 3cd696854c3c9d89a6f6d5543b3ce276f751dfbc..e25c34c75f8c863262053c10167bc70da0279102 100644 (file)
 package org.opendaylight.controller.routing.dijkstra_implementation.internal;
 
 import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.Hashtable;
+import java.util.Set;
+
 import org.apache.felix.dm.Component;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.routing.IListenRoutingUpdates;
 import org.opendaylight.controller.sal.routing.IRouting;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
-import org.opendaylight.controller.sal.reader.IReadService;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
-import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 
 public class Activator extends ComponentActivatorAbstractBase {
     protected static final Logger logger = LoggerFactory
@@ -33,6 +36,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * ComponentActivatorAbstractBase.
      *
      */
+    @Override
     public void init() {
     }
 
@@ -41,6 +45,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * cleanup done by ComponentActivatorAbstractBase
      *
      */
+    @Override
     public void destroy() {
 
     }
@@ -54,6 +59,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * instantiated in order to get an fully working implementation
      * Object
      */
+    @Override
     public Object[] getImplementations() {
         Object[] res = { DijkstraImplementation.class };
         return res;
@@ -72,32 +78,39 @@ public class Activator extends ComponentActivatorAbstractBase {
      * also optional per-container different behavior if needed, usually
      * should not be the case though.
      */
-    public void configureInstance(Component c, Object imp, String containerName) {
+    @Override
+    public void configureInstance(final Component c, final Object imp, final String containerName) {
         if (imp.equals(DijkstraImplementation.class)) {
             // export the service
-            Dictionary<String, String> props = new Hashtable<String, String>();
+            final Dictionary<String, Object> props = new Hashtable<String, Object>();
             props.put("topoListenerName", "routing.Dijkstra");
-            c.setInterface(new String[] { ITopologyManagerAware.class.getName(),
-                    IRouting.class.getName() }, props);
+
+            c.setInterface(new String[] { ITopologyManagerClusterWideAware.class.getName(), IRouting.class.getName() },
+                    props);
 
             // Now lets add a service dependency to make sure the
             // provider of service exists
-            c.add(createContainerServiceDependency(containerName).setService(
-                    IListenRoutingUpdates.class).setCallbacks(
-                    "setListenRoutingUpdates", "unsetListenRoutingUpdates")
+            c.add(createContainerServiceDependency(containerName).setService(IListenRoutingUpdates.class)
+                    .setCallbacks("setListenRoutingUpdates", "unsetListenRoutingUpdates")
                     .setRequired(false));
 
-            c.add(createContainerServiceDependency(containerName).setService(
-                    ISwitchManager.class).setCallbacks("setSwitchManager",
-                    "unsetSwitchManager").setRequired(true));
+            c.add(createContainerServiceDependency(containerName).setService(ISwitchManager.class)
+                    .setCallbacks("setSwitchManager", "unsetSwitchManager")
+                    .setRequired(true));
 
-            c.add(createContainerServiceDependency(containerName).setService(
-                    ITopologyManager.class).setCallbacks("setTopologyManager",
-                    "unsetTopologyManager").setRequired(true));
+            c.add(createContainerServiceDependency(containerName).setService(ITopologyManager.class)
+                    .setCallbacks("setTopologyManager", "unsetTopologyManager")
+                    .setRequired(true));
 
-            c.add(createContainerServiceDependency(containerName).setService(
-                    IReadService.class).setCallbacks("setReadService",
-                    "unsetReadService").setRequired(true));
+            c.add(createContainerServiceDependency(containerName).setService(IClusterContainerServices.class)
+                    .setCallbacks("setClusterContainerService", "unsetClusterContainerService")
+                    .setRequired(true));
         }
     }
+
+    @Override
+    protected Object[] getGlobalImplementations() {
+        final Object[] res = { DijkstraImplementationCLI.class };
+        return res;
+    }
 }
index 3a0faa91d4bbca4018d18022ac2418b2e8fa8804..a8ef381528d5a9fc8f839e40427e4e7f9e9d27de 100644 (file)
@@ -16,6 +16,7 @@
  */
 package org.opendaylight.controller.routing.dijkstra_implementation.internal;
 
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.sal.core.Bandwidth;
 import org.opendaylight.controller.sal.core.ConstructionException;
 import org.opendaylight.controller.sal.core.Edge;
@@ -24,19 +25,18 @@ import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Path;
 import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.UpdateType;
-import org.opendaylight.controller.sal.reader.IReadService;
 import org.opendaylight.controller.sal.routing.IListenRoutingUpdates;
-
 import org.opendaylight.controller.sal.routing.IRouting;
 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
-import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
 
 import edu.uci.ics.jung.algorithms.shortestpath.DijkstraShortestPath;
 import edu.uci.ics.jung.graph.Graph;
 import edu.uci.ics.jung.graph.SparseMultigraph;
 import edu.uci.ics.jung.graph.util.EdgeType;
+
 import java.lang.Exception;
 import java.lang.IllegalArgumentException;
 import java.util.HashSet;
@@ -47,23 +47,24 @@ import java.util.Set;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.collections15.Transformer;
 
-public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
-    private static Logger log = LoggerFactory
-            .getLogger(DijkstraImplementation.class);
+@SuppressWarnings("rawtypes")
+public class DijkstraImplementation implements IRouting, ITopologyManagerClusterWideAware {
+    private static Logger log = LoggerFactory.getLogger(DijkstraImplementation.class);
     private ConcurrentMap<Short, Graph<Node, Edge>> topologyBWAware;
     private ConcurrentMap<Short, DijkstraShortestPath<Node, Edge>> sptBWAware;
     DijkstraShortestPath<Node, Edge> mtp; // Max Throughput Path
     private Set<IListenRoutingUpdates> routingAware;
     private ISwitchManager switchManager;
     private ITopologyManager topologyManager;
-    private IReadService readService;
     private static final long DEFAULT_LINK_SPEED = Bandwidth.BW1Gbps;
+    private IClusterContainerServices clusterContainerService;
 
-    public void setListenRoutingUpdates(IListenRoutingUpdates i) {
+    public void setListenRoutingUpdates(final IListenRoutingUpdates i) {
         if (this.routingAware == null) {
             this.routingAware = new HashSet<IListenRoutingUpdates>();
         }
@@ -73,7 +74,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         }
     }
 
-    public void unsetListenRoutingUpdates(IListenRoutingUpdates i) {
+    public void unsetListenRoutingUpdates(final IListenRoutingUpdates i) {
         if (this.routingAware == null) {
             return;
         }
@@ -95,6 +96,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         Transformer<Edge, ? extends Number> mtTransformer = null;
         if (EdgeWeightMap == null) {
             mtTransformer = new Transformer<Edge, Double>() {
+                @Override
                 public Double transform(Edge e) {
                     if (switchManager == null) {
                         log.error("switchManager is null");
@@ -106,43 +108,39 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
                         log.error("srcNC:{} or dstNC:{} is null", srcNC, dstNC);
                         return (double) -1;
                     }
-                    Bandwidth bwSrc = (Bandwidth) switchManager
-                            .getNodeConnectorProp(srcNC,
-                                    Bandwidth.BandwidthPropName);
-                    Bandwidth bwDst = (Bandwidth) switchManager
-                            .getNodeConnectorProp(dstNC,
-                                    Bandwidth.BandwidthPropName);
+                    Bandwidth bwSrc = (Bandwidth) switchManager.getNodeConnectorProp(srcNC,
+                            Bandwidth.BandwidthPropName);
+                    Bandwidth bwDst = (Bandwidth) switchManager.getNodeConnectorProp(dstNC,
+                            Bandwidth.BandwidthPropName);
 
                     long srcLinkSpeed = 0, dstLinkSpeed = 0;
-                    if ((bwSrc == null)
-                            || ((srcLinkSpeed = bwSrc.getValue()) == 0)) {
-                        log.debug(
-                                "srcNC: {} - Setting srcLinkSpeed to Default!",
-                                srcNC);
+                    if ((bwSrc == null) || ((srcLinkSpeed = bwSrc.getValue()) == 0)) {
+                        log.debug("srcNC: {} - Setting srcLinkSpeed to Default!", srcNC);
                         srcLinkSpeed = DEFAULT_LINK_SPEED;
                     }
 
-                    if ((bwDst == null)
-                            || ((dstLinkSpeed = bwDst.getValue()) == 0)) {
-                        log.debug(
-                                "dstNC: {} - Setting dstLinkSpeed to Default!",
-                                dstNC);
+                    if ((bwDst == null) || ((dstLinkSpeed = bwDst.getValue()) == 0)) {
+                        log.debug("dstNC: {} - Setting dstLinkSpeed to Default!", dstNC);
                         dstLinkSpeed = DEFAULT_LINK_SPEED;
                     }
 
-                    long avlSrcThruPut = srcLinkSpeed
-                            - readService.getTransmitRate(srcNC);
-                    long avlDstThruPut = dstLinkSpeed
-                            - readService.getTransmitRate(dstNC);
-
-                    // Use lower of the 2 available thruput as the available
-                    // thruput
-                    long avlThruPut = avlSrcThruPut < avlDstThruPut ? avlSrcThruPut
-                            : avlDstThruPut;
+                    // TODO: revisit the logic below with the real use case in
+                    // mind
+                    // For now we assume the throughput to be the speed of the
+                    // link itself
+                    // this kind of logic require information that should be
+                    // polled by statistic manager and are not yet available,
+                    // also this service at the moment is not used, so to be
+                    // revisited later on
+                    long avlSrcThruPut = srcLinkSpeed;
+                    long avlDstThruPut = dstLinkSpeed;
+
+                    // Use lower of the 2 available throughput as the available
+                    // throughput
+                    long avlThruPut = avlSrcThruPut < avlDstThruPut ? avlSrcThruPut : avlDstThruPut;
 
                     if (avlThruPut <= 0) {
-                        log.debug("Edge {}: Available Throughput {} <= 0!", e,
-                                avlThruPut);
+                        log.debug("Edge {}: Available Throughput {} <= 0!", e, avlThruPut);
                         return (double) -1;
                     }
                     return (double) (Bandwidth.BW1Pbps / avlThruPut);
@@ -150,6 +148,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
             };
         } else {
             mtTransformer = new Transformer<Edge, Number>() {
+                @Override
                 public Number transform(Edge e) {
                     return EdgeWeightMap.get(e);
                 }
@@ -166,8 +165,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
     }
 
     @Override
-    public Path getRoute(Node src, Node dst) {
-        if (src == null || dst == null) {
+    public Path getRoute(final Node src, final Node dst) {
+        if ((src == null) || (dst == null)) {
             return null;
         }
         return getRoute(src, dst, (short) 0);
@@ -198,10 +197,11 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
     }
 
     @Override
-    public synchronized Path getRoute(Node src, Node dst, Short Bw) {
+    public synchronized Path getRoute(final Node src, final Node dst, final Short Bw) {
         DijkstraShortestPath<Node, Edge> spt = this.sptBWAware.get(Bw);
-        if (spt == null)
+        if (spt == null) {
             return null;
+        }
         List<Edge> path;
         try {
             path = spt.getPath(src, dst);
@@ -238,8 +238,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         }
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private synchronized boolean updateTopo(Edge edge, Short bw, boolean added) {
+    @SuppressWarnings({ "unchecked" })
+    private synchronized boolean updateTopo(Edge edge, Short bw, UpdateType type) {
         Graph<Node, Edge> topo = this.topologyBWAware.get(bw);
         DijkstraShortestPath<Node, Edge> spt = this.sptBWAware.get(bw);
         boolean edgePresentInGraph = false;
@@ -262,7 +262,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
                 this.sptBWAware.put(bw, spt);
             }
 
-            if (added) {
+            switch (type) {
+            case ADDED:
                 // Make sure the vertex are there before adding the edge
                 topo.addVertex(src.getNode());
                 topo.addVertex(dst.getNode());
@@ -270,37 +271,39 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
                 edgePresentInGraph = topo.containsEdge(edge);
                 if (edgePresentInGraph == false) {
                     try {
-                        topo.addEdge(new Edge(src, dst), src.getNode(),
-                                dst.getNode(), EdgeType.DIRECTED);
-                    } catch (ConstructionException e) {
+                        topo.addEdge(new Edge(src, dst), src.getNode(), dst.getNode(), EdgeType.DIRECTED);
+                    } catch (final ConstructionException e) {
                         log.error("", e);
                         return edgePresentInGraph;
                     }
                 }
-            } else {
+            case CHANGED:
+                // Mainly raised only on properties update, so not really useful
+                // in this case
+                break;
+            case REMOVED:
                 // Remove the edge
                 try {
                     topo.removeEdge(new Edge(src, dst));
-                } catch (ConstructionException e) {
+                } catch (final ConstructionException e) {
                     log.error("", e);
                     return edgePresentInGraph;
                 }
 
                 // If the src and dst vertex don't have incoming or
                 // outgoing links we can get ride of them
-                if (topo.containsVertex(src.getNode())
-                        && topo.inDegree(src.getNode()) == 0
-                        && topo.outDegree(src.getNode()) == 0) {
+                if (topo.containsVertex(src.getNode()) && (topo.inDegree(src.getNode()) == 0)
+                        && (topo.outDegree(src.getNode()) == 0)) {
                     log.debug("Removing vertex {}", src);
                     topo.removeVertex(src.getNode());
                 }
 
-                if (topo.containsVertex(dst.getNode())
-                        && topo.inDegree(dst.getNode()) == 0
-                        && topo.outDegree(dst.getNode()) == 0) {
+                if (topo.containsVertex(dst.getNode()) && (topo.inDegree(dst.getNode()) == 0)
+                        && (topo.outDegree(dst.getNode()) == 0)) {
                     log.debug("Removing vertex {}", dst);
                     topo.removeVertex(dst.getNode());
                 }
+                break;
             }
             spt.reset();
             if (bw.equals(baseBW)) {
@@ -312,11 +315,13 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         return edgePresentInGraph;
     }
 
-    private boolean edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
+    private boolean edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean local) {
         String srcType = null;
         String dstType = null;
 
-        if (e == null || type == null) {
+        log.trace("Got an edgeUpdate: {} props: {} update type: {} local: {}", new Object[] { e, props, type, local });
+
+        if ((e == null) || (type == null)) {
             log.error("Edge or Update type are null!");
             return false;
         } else {
@@ -336,21 +341,17 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
 
         Bandwidth bw = new Bandwidth(0);
         boolean newEdge = false;
-        if (props != null)
+        if (props != null) {
             props.remove(bw);
-
-        if (log.isDebugEnabled()) {
-          log.debug("edgeUpdate: {} bw: {}", e, bw.getValue());
         }
 
         Short baseBW = Short.valueOf((short) 0);
-        boolean add = (type == UpdateType.ADDED) ? true : false;
         // Update base topo
-        newEdge = !updateTopo(e, baseBW, add);
+        newEdge = !updateTopo(e, baseBW, type);
         if (newEdge == true) {
             if (bw.getValue() != baseBW) {
                 // Update BW topo
-                updateTopo(e, (short) bw.getValue(), add);
+                updateTopo(e, (short) bw.getValue(), type);
             }
         }
         return newEdge;
@@ -358,16 +359,30 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
 
     @Override
     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+        log.trace("Start of a Bulk EdgeUpdate with " + topoedgeupdateList.size() + " elements");
         boolean callListeners = false;
         for (int i = 0; i < topoedgeupdateList.size(); i++) {
             Edge e = topoedgeupdateList.get(i).getEdge();
-            Set<Property> p = topoedgeupdateList.get(i).getProperty();
-            UpdateType type = topoedgeupdateList.get(i).getUpdateType();
-            if ((edgeUpdate(e, type, p)) && (!callListeners)) {
+            Set<Property> p = topoedgeupdateList.get(i)
+                    .getProperty();
+            UpdateType type = topoedgeupdateList.get(i)
+                    .getUpdateType();
+            boolean isLocal = topoedgeupdateList.get(i)
+                    .isLocal();
+            if ((edgeUpdate(e, type, p, isLocal)) && (!callListeners)) {
                 callListeners = true;
             }
         }
-        if ((callListeners) && (this.routingAware != null)) {
+
+        // The routing listeners should only be called on the coordinator, to
+        // avoid multiple controller cluster nodes to actually do the
+        // recalculation when only one need to react
+        boolean amICoordinator = true;
+        if (this.clusterContainerService != null) {
+            amICoordinator = this.clusterContainerService.amICoordinator();
+        }
+        if ((callListeners) && (this.routingAware != null) && amICoordinator) {
+            log.trace("Calling the routing listeners");
             for (IListenRoutingUpdates ra : this.routingAware) {
                 try {
                     ra.recalculateDone();
@@ -376,6 +391,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
                 }
             }
         }
+        log.trace("End of a Bulk EdgeUpdate");
     }
 
     /**
@@ -386,8 +402,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public void init() {
         log.debug("Routing init() is called");
-        this.topologyBWAware = (ConcurrentMap<Short, Graph<Node, Edge>>) new ConcurrentHashMap();
-        this.sptBWAware = (ConcurrentMap<Short, DijkstraShortestPath<Node, Edge>>) new ConcurrentHashMap();
+        this.topologyBWAware = new ConcurrentHashMap<Short, Graph<Node, Edge>>();
+        this.sptBWAware = new ConcurrentHashMap<Short, DijkstraShortestPath<Node, Edge>>();
         // Now create the default topology, which doesn't consider the
         // BW, also create the corresponding Dijkstra calculation
         Graph<Node, Edge> g = new SparseMultigraph();
@@ -443,18 +459,6 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         log.debug("Routing stop() is called");
     }
 
-    @Override
-    public void edgeOverUtilized(Edge edge) {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void edgeUtilBackToNormal(Edge edge) {
-        // TODO Auto-generated method stub
-
-    }
-
     public void setSwitchManager(ISwitchManager switchManager) {
         this.switchManager = switchManager;
     }
@@ -465,16 +469,6 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         }
     }
 
-    public void setReadService(IReadService readService) {
-        this.readService = readService;
-    }
-
-    public void unsetReadService(IReadService readService) {
-        if (this.readService == readService) {
-            this.readService = null;
-        }
-    }
-
     public void setTopologyManager(ITopologyManager tm) {
         this.topologyManager = tm;
     }
@@ -484,4 +478,28 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
             this.topologyManager = null;
         }
     }
+
+    void setClusterContainerService(IClusterContainerServices s) {
+        log.debug("Cluster Service set");
+        this.clusterContainerService = s;
+    }
+
+    void unsetClusterContainerService(IClusterContainerServices s) {
+        if (this.clusterContainerService == s) {
+            log.debug("Cluster Service removed!");
+            this.clusterContainerService = null;
+        }
+    }
+
+    @Override
+    public void edgeOverUtilized(Edge edge) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void edgeUtilBackToNormal(Edge edge) {
+        // TODO Auto-generated method stub
+
+    }
 }
diff --git a/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementationCLI.java b/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementationCLI.java
new file mode 100644 (file)
index 0000000..a6645cc
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.routing.dijkstra_implementation.internal;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.felix.service.command.Descriptor;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.core.Path;
+import org.opendaylight.controller.sal.routing.IRouting;
+import org.opendaylight.controller.sal.utils.ServiceHelper;
+import org.osgi.framework.ServiceRegistration;
+
+public class DijkstraImplementationCLI {
+    @SuppressWarnings("rawtypes")
+    private ServiceRegistration sr = null;
+
+    public void init() {
+    }
+
+    public void destroy() {
+    }
+
+    public void start() {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put("osgi.command.scope", "odpcontroller");
+        props.put("osgi.command.function", new String[] { "getRoute" });
+        this.sr = ServiceHelper.registerGlobalServiceWReg(DijkstraImplementationCLI.class, this, props);
+    }
+
+    public void stop() {
+        if (this.sr != null) {
+            this.sr.unregister();
+            this.sr = null;
+        }
+    }
+
+    @Descriptor("Retrieves a Route between two Nodes in the discovered Topology DB")
+    public void getRoute(
+            @Descriptor("Container on the context of which the routing service need to be looked up") String container,
+            @Descriptor("String representation of the Source Node, this need to be consumable from Node.fromString()") String srcNode,
+            @Descriptor("String representation of the Destination Node") String dstNode) {
+        final IRouting r = (IRouting) ServiceHelper.getInstance(IRouting.class, container, this);
+
+        if (r == null) {
+            System.out.println("Cannot find the routing instance on container:" + container);
+            return;
+        }
+
+        final Node src = Node.fromString(srcNode);
+        final Node dst = Node.fromString(dstNode);
+        final Path p = r.getRoute(src, dst);
+        if (p != null) {
+            System.out.println("Route between srcNode:" + src + " and dstNode:" + dst + " = " + p);
+        } else {
+            System.out.println("There is no route between srcNode:" + src + " and dstNode:" + dst);
+        }
+    }
+}
index 58bf350c40e015878cad1ae01f9de3ca9e7f7596..0208cc7cdac4693a2ffafb42a06d54b0e11e6e71 100644 (file)
@@ -17,16 +17,29 @@ import org.opendaylight.controller.sal.core.UpdateType;
 /**
  * The class represents an Edge, the Edge's Property Set and its UpdateType.
  */
-
 public class TopoEdgeUpdate {
     private Edge edge;
     private Set<Property> props;
     private UpdateType type;
+    private boolean isLocal;
 
+    /**
+     * Constructor for a topology element update. A TopologyElementUpdate is an
+     * object that summarize what has happened on an Edge and if the update is
+     * generated locally to this controller or no
+     *
+     * @param e
+     *            Edge being updated
+     * @param p
+     *            Set of Properties attached to the edge
+     * @param t
+     *            Type of update
+     */
     public TopoEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
         edge = e;
         props = p;
         type = t;
+        setLocal(true);
     }
 
     public Edge getEdge() {
@@ -45,39 +58,46 @@ public class TopoEdgeUpdate {
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((edge == null) ? 0 : edge.hashCode());
-        result = prime * result + ((props == null) ? 0 : props.hashCode());
-        result = prime * result + ((type == null) ? 0 : type.hashCode());
+        result = (prime * result) + ((edge == null) ? 0 : edge.hashCode());
+        result = (prime * result) + ((type == null) ? 0 : type.hashCode());
         return result;
     }
 
     @Override
     public String toString() {
-        return "TopoEdgeUpdate [edge=" + edge + ", props=" + props + ", type="
-                + type + "]";
+        return "TopoEdgeUpdate [edge=" + edge + ", props=" + props + ", type=" + type + ", isLocal=" + isLocal + "]";
     }
 
     @Override
     public boolean equals(Object obj) {
-        if (this == obj)
+        if (this == obj) {
             return true;
-        if (obj == null)
+        }
+        if (obj == null) {
             return false;
-        if (getClass() != obj.getClass())
+        }
+        if (getClass() != obj.getClass()) {
             return false;
+        }
         TopoEdgeUpdate other = (TopoEdgeUpdate) obj;
         if (edge == null) {
-            if (other.edge != null)
-                return false;
-        } else if (!edge.equals(other.edge))
-            return false;
-        if (props == null) {
-            if (other.props != null)
+            if (other.edge != null) {
                 return false;
-        } else if (!props.equals(other.props))
+            }
+        } else if (!edge.equals(other.edge)) {
             return false;
-        if (type != other.type)
+        }
+        if (type != other.type) {
             return false;
+        }
         return true;
     }
+
+    public boolean isLocal() {
+        return isLocal;
+    }
+
+    public void setLocal(boolean isLocal) {
+        this.isLocal = isLocal;
+    }
 }
diff --git a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerClusterWideAware.java b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerClusterWideAware.java
new file mode 100644 (file)
index 0000000..aa47b5f
--- /dev/null
@@ -0,0 +1,11 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.topologymanager;
+
+public interface ITopologyManagerClusterWideAware extends ITopologyManagerAware {
+}
index 0da1a2ee39ab73255aca9149095f15ceb6ac8c45..d8ff141edea4da61492cd9e79bbc76d6acede158 100644 (file)
@@ -15,6 +15,7 @@ import java.util.Hashtable;
 import java.util.Set;
 
 import org.apache.felix.dm.Component;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
@@ -22,6 +23,7 @@ import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.ITopologyService;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +37,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * ComponentActivatorAbstractBase.
      *
      */
+    @Override
     public void init() {
 
     }
@@ -44,6 +47,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * cleanup done by ComponentActivatorAbstractBase
      *
      */
+    @Override
     public void destroy() {
 
     }
@@ -57,6 +61,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * instantiated in order to get an fully working implementation
      * Object
      */
+    @Override
     public Object[] getImplementations() {
         Object[] res = { TopologyManagerImpl.class };
         return res;
@@ -75,17 +80,19 @@ public class Activator extends ComponentActivatorAbstractBase {
      * also optional per-container different behavior if needed, usually
      * should not be the case though.
      */
+    @Override
     public void configureInstance(Component c, Object imp, String containerName) {
         if (imp.equals(TopologyManagerImpl.class)) {
             // export the service needed to listen to topology updates
             Dictionary<String, Set<String>> props = new Hashtable<String, Set<String>>();
             Set<String> propSet = new HashSet<String>();
-            propSet.add("topologymanager.configSaveEvent");
+            propSet.add(TopologyManagerImpl.TOPOEDGESDB);
             props.put("cachenames", propSet);
 
             c.setInterface(new String[] { IListenTopoUpdates.class.getName(),
                     ITopologyManager.class.getName(),
-                    IConfigurationContainerAware.class.getName() }, props);
+                    IConfigurationContainerAware.class.getName(),
+                    ICacheUpdateAware.class.getName() }, props);
 
             c.add(createContainerServiceDependency(containerName).setService(
                     ITopologyService.class).setCallbacks("setTopoService",
@@ -98,6 +105,13 @@ public class Activator extends ComponentActivatorAbstractBase {
                     "setTopologyManagerAware", "unsetTopologyManagerAware")
                     .setRequired(false));
 
+            // These are all the listeners for a topology manager for the
+            // cluster wide events
+            // updates, there could be many or none
+            c.add(createContainerServiceDependency(containerName).setService(ITopologyManagerClusterWideAware.class)
+                    .setCallbacks("setTopologyManagerClusterWideAware", "unsetTopologyManagerClusterWideAware")
+                    .setRequired(false));
+
             c.add(createContainerServiceDependency(containerName).setService(
                     IClusterContainerServices.class).setCallbacks(
                     "setClusterContainerService",
index a043cbe92595ff011483296bc1fc27be6bb7a9ee..4972d3b5b5d73d41df3cfbbb99543db8fde2eb56 100644 (file)
@@ -21,9 +21,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.felix.dm.Component;
@@ -31,6 +33,7 @@ import org.eclipse.osgi.framework.console.CommandInterpreter;
 import org.eclipse.osgi.framework.console.CommandProvider;
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
@@ -52,6 +55,7 @@ import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
@@ -63,9 +67,17 @@ import org.slf4j.LoggerFactory;
  * network topology. It provides service for applications to interact with
  * topology database and notifies all the listeners of topology changes.
  */
-public class TopologyManagerImpl implements ITopologyManager,
-IConfigurationContainerAware, IListenTopoUpdates, IObjectReader,
-CommandProvider {
+public class TopologyManagerImpl implements
+        ICacheUpdateAware,
+        ITopologyManager,
+        IConfigurationContainerAware,
+        IListenTopoUpdates,
+        IObjectReader,
+        CommandProvider {
+    static final String TOPOEDGESDB = "topologymanager.edgesDB";
+    static final String TOPOHOSTSDB = "topologymanager.hostsDB";
+    static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
+    static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
     private static final String SAVE = "Save";
     private ITopologyService topoService;
@@ -79,13 +91,16 @@ CommandProvider {
     // DB of all the NodeConnectors with an Host attached to it
     private ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>> hostsDB;
     // Topology Manager Aware listeners
-    private Set<ITopologyManagerAware> topologyManagerAware =
-            new CopyOnWriteArraySet<ITopologyManagerAware>();;
-
+    private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
+    // Topology Manager Aware listeners - for clusterwide updates
+    private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
+            new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
     private String userLinksFileName;
     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
-    private ConcurrentMap<Long, String> configSaveEvent;
+    private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
+    private volatile Boolean shuttingDown = false;
+    private Thread notifyThread;
 
 
     void nonClusterObjectCreate() {
@@ -93,7 +108,6 @@ CommandProvider {
         hostsDB = new ConcurrentHashMap<NodeConnector, ImmutablePair<Host, Set<Property>>>();
         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
-        configSaveEvent = new ConcurrentHashMap<Long, String>();
     }
 
     void setTopologyManagerAware(ITopologyManagerAware s) {
@@ -110,6 +124,20 @@ CommandProvider {
         }
     }
 
+    void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+        if (this.topologyManagerClusterWideAware != null) {
+            log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
+            this.topologyManagerClusterWideAware.add(s);
+        }
+    }
+
+    void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+        if (this.topologyManagerClusterWideAware != null) {
+            log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
+            this.topologyManagerClusterWideAware.remove(s);
+        }
+    }
+
     void setTopoService(ITopologyService s) {
         log.debug("Adding ITopologyService: {}", s);
         this.topoService = s;
@@ -140,10 +168,8 @@ CommandProvider {
      *
      */
     void init(Component c) {
-
         allocateCaches();
         retrieveCaches();
-
         String containerName = null;
         Dictionary<?, ?> props = c.getServiceProperties();
         if (props != null) {
@@ -156,61 +182,52 @@ CommandProvider {
         userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
         registerWithOSGIConsole();
         loadConfiguration();
+        // Restore the shuttingDown status on init of the component
+        shuttingDown = false;
+        notifyThread = new Thread(new TopologyNotify(notifyQ));
     }
 
     @SuppressWarnings({ "unchecked", "deprecation" })
-    private void allocateCaches(){
-        if (this.clusterContainerService == null) {
-            nonClusterObjectCreate();
-            log.error("Cluster Services unavailable, allocated non-cluster caches!");
-            return;
-        }
-
+    private void allocateCaches() {
         try {
-            this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(
-                    "topologymanager.edgesDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.edgesDB =
+                    (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
+                            EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.edgesDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.edgesDB Cache configuration invalid - check cache mode");
+            log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
-                    .createCache("topologymanager.hostsDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.hostsDB =
+                    (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.createCache(
+                            TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.hostsDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.hostsDB Cache configuration invalid - check cache mode");
+            log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
-                    .createCache("topologymanager.nodeConnectorDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.nodeConnectorsDB =
+                    (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
+                            TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.nodeConnectorDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.nodeConnectorDB Cache configuration invalid - check cache mode");
+            log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.userLinksDB = (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService
-                    .createCache("topologymanager.userLinksDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.userLinksDB =
+                    (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
+                            TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.userLinksDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.userLinksDB Cache configuration invalid - check cache mode");
+            log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
         }
-
-        try {
-            this.configSaveEvent = (ConcurrentMap<Long, String>) this.clusterContainerService
-                    .createCache("topologymanager.configSaveEvent", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
-        } catch (CacheExistException cee) {
-            log.debug("topologymanager.configSaveEvent Cache already exists - destroy and recreate if needed");
-        } catch (CacheConfigException cce) {
-            log.error("topologymanager.configSaveEvent Cache configuration invalid - check cache mode");
-        }
-
     }
 
     @SuppressWarnings({ "unchecked", "deprecation" })
@@ -220,36 +237,28 @@ CommandProvider {
             return;
         }
 
-        this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService
-                .getCache("topologymanager.edgesDB");
+        this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
         if (edgesDB == null) {
-            log.error("Failed to get cache for topologymanager.edgesDB");
+            log.error("Failed to get cache for " + TOPOEDGESDB);
         }
 
-        this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
-                .getCache("topologymanager.hostsDB");
+        this.hostsDB =
+                (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
         if (hostsDB == null) {
-            log.error("Failed to get cache for topologymanager.hostsDB");
+            log.error("Failed to get cache for " + TOPOHOSTSDB);
         }
 
-        this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
-                .getCache("topologymanager.nodeConnectorDB");
+        this.nodeConnectorsDB =
+                (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
         if (nodeConnectorsDB == null) {
-            log.error("Failed to get cache for topologymanager.nodeConnectorDB");
+            log.error("Failed to get cache for " + TOPONODECONNECTORDB);
         }
 
-        this.userLinksDB = (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService
-                .getCache("topologymanager.userLinksDB");
+        this.userLinksDB =
+                (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
         if (userLinksDB == null) {
-            log.error("Failed to get cache for topologymanager.userLinksDB");
+            log.error("Failed to get cache for " + TOPOUSERLINKSDB);
         }
-
-        this.configSaveEvent = (ConcurrentMap<Long, String>) this.clusterContainerService
-                .getCache("topologymanager.configSaveEvent");
-        if (configSaveEvent == null) {
-            log.error("Failed to get cache for topologymanager.configSaveEvent");
-        }
-
     }
 
     /**
@@ -258,12 +267,19 @@ CommandProvider {
      *
      */
     void started() {
+        // Start the batcher thread for the cluster wide topology updates
+        notifyThread.start();
         // SollicitRefresh MUST be called here else if called at init
         // time it may sollicit refresh too soon.
         log.debug("Sollicit topology refresh");
         topoService.sollicitRefresh();
     }
 
+    void stop() {
+        shuttingDown = true;
+        notifyThread.interrupt();
+    }
+
     /**
      * Function called by the dependency manager when at least one dependency
      * become unsatisfied or when the component is shutting down because for
@@ -271,6 +287,8 @@ CommandProvider {
      *
      */
     void destroy() {
+        notifyQ.clear();
+        notifyThread = null;
     }
 
     @SuppressWarnings("unchecked")
@@ -288,8 +306,6 @@ CommandProvider {
 
     @Override
     public Status saveConfig() {
-        // Publish the save config event to the cluster
-        configSaveEvent.put(new Date().getTime(), SAVE );
         return saveConfigInternal();
     }
 
@@ -436,7 +452,7 @@ CommandProvider {
     @Override
     public Host getHostAttachedToNodeConnector(NodeConnector port) {
         ImmutablePair<Host, Set<Property>> host;
-        if (this.hostsDB == null || (host = this.hostsDB.get(port)) == null) {
+        if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) {
             return null;
         }
         return host.getLeft();
@@ -674,7 +690,7 @@ CommandProvider {
 
         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
         Edge linkTuple;
-        if (link != null && (linkTuple = getLinkTuple(link)) != null) {
+        if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
             if (! isProductionLink(linkTuple)) {
                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
             }
@@ -799,4 +815,92 @@ CommandProvider {
         log.warn("Link Utilization back to normal: {}", edge);
     }
 
+    private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
+        TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
+        upd.setLocal(isLocal);
+        notifyQ.add(upd);
+    }
+
+    @Override
+    public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            // This is the case of an Edge being added to the topology DB
+            final Edge e = (Edge) key;
+            log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
+            edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
+        }
+    }
+
+    @Override
+    public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            final Edge e = (Edge) key;
+            log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+            final Set<Property> props = (Set<Property>) new_value;
+            edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
+        }
+    }
+
+    @Override
+    public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            final Edge e = (Edge) key;
+            log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
+            edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
+        }
+    }
+
+    class TopologyNotify implements Runnable {
+        private final BlockingQueue<TopoEdgeUpdate> notifyQ;
+        private TopoEdgeUpdate entry;
+        private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+        private boolean notifyListeners;
+
+        TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
+            this.notifyQ = notifyQ;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    log.trace("New run of TopologyNotify");
+                    notifyListeners = false;
+                    // First we block waiting for an element to get in
+                    entry = notifyQ.take();
+                    // Then we drain the whole queue if elements are
+                    // in it without getting into any blocking condition
+                    for (; entry != null; entry = notifyQ.poll()) {
+                        teuList.add(entry);
+                        notifyListeners = true;
+                    }
+
+                    // Notify listeners only if there were updates drained else
+                    // give up
+                    if (notifyListeners) {
+                        log.trace("Notifier thread, notified a listener");
+                        // Now update the listeners
+                        for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
+                            try {
+                                s.edgeUpdate(teuList);
+                            } catch (Exception exc) {
+                                log.error("Exception on edge update:", exc);
+                            }
+                        }
+                    }
+                    teuList.clear();
+
+                    // Lets sleep for sometime to allow aggregation of event
+                    Thread.sleep(100);
+                } catch (InterruptedException e1) {
+                    log.warn("TopologyNotify interrupted {}", e1.getMessage());
+                    if (shuttingDown) {
+                        return;
+                    }
+                } catch (Exception e2) {
+                    log.error("", e2);
+                }
+            }
+        }
+    }
 }