Merge "Implement cluster wide topology notifications and let routing use it"
authorMadhu Venugopal <vmadhu@cisco.com>
Fri, 9 Aug 2013 19:32:32 +0000 (19:32 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 9 Aug 2013 19:32:32 +0000 (19:32 +0000)
opendaylight/clustering/test/src/main/java/org/opendaylight/controller/clustering/test/internal/SimpleClient.java
opendaylight/routing/dijkstra_implementation/pom.xml
opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/Activator.java
opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementation.java
opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementationCLI.java [new file with mode: 0644]
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/topology/TopoEdgeUpdate.java
opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerClusterWideAware.java [new file with mode: 0644]
opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java
opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java

index 0a0a9d8828ea3ec6de32739d13e85f99e2dd08bf..f681c35c8c9ea376a3d0399269d798479a296689 100644 (file)
@@ -151,7 +151,7 @@ public class SimpleClient implements CommandProvider {
                 .getILoggerFactory();
         if (lc != null) {
             for (ch.qos.logback.classic.Logger l : lc.getLoggerList()) {
-                if (loggerName == null || l.getName().startsWith(loggerName)) {
+                if ((loggerName == null) || l.getName().startsWith(loggerName)) {
                     ci.println(retrieveLogLevel(l));
                 }
             }
@@ -382,6 +382,7 @@ public class SimpleClient implements CommandProvider {
             ci.println("Cache not supplied");
             return;
         }
+        int count = 0;
         c = (ConcurrentMap<Object, Object>) this.icluster.getCache(containerName, cacheName);
         if (c != null) {
             for (Map.Entry<Object, Object> e : c.entrySet()) {
@@ -394,7 +395,9 @@ public class SimpleClient implements CommandProvider {
                 ci.println("Element " + entry.getKey() + "(hashCode="
                         + entry.getKey().hashCode() + ") has value = (" + res
                         + ")");
+                count++;
             }
+            ci.println("Dumped " + count + " records");
         } else {
             ci.println("Cache " + cacheName + " on container " + containerName
                     + " not existant!");
index 0e45e06cdf234301f048c99799448c4aae385a57..c0f7afa65ce9125b29c1e8aeb2ce4e2793cb9059 100644 (file)
@@ -28,6 +28,7 @@
               org.opendaylight.controller.sal.topology,
               org.opendaylight.controller.sal.utils,
               org.opendaylight.controller.sal.reader,
+              org.opendaylight.controller.clustering.services,
               org.apache.commons.collections15,
               org.opendaylight.controller.switchmanager,
               org.opendaylight.controller.topologymanager,
@@ -35,6 +36,8 @@
               edu.uci.ics.jung.algorithms.shortestpath,
               edu.uci.ics.jung.graph.util,
               org.apache.felix.dm,
+              org.osgi.framework,
+              org.apache.felix.service.command,
               org.junit;resolution:=optional
             </Import-Package>
             <Bundle-Activator>
index 3cd696854c3c9d89a6f6d5543b3ce276f751dfbc..e25c34c75f8c863262053c10167bc70da0279102 100644 (file)
 package org.opendaylight.controller.routing.dijkstra_implementation.internal;
 
 import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.Hashtable;
+import java.util.Set;
+
 import org.apache.felix.dm.Component;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.routing.IListenRoutingUpdates;
 import org.opendaylight.controller.sal.routing.IRouting;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
-import org.opendaylight.controller.sal.reader.IReadService;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
-import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 
 public class Activator extends ComponentActivatorAbstractBase {
     protected static final Logger logger = LoggerFactory
@@ -33,6 +36,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * ComponentActivatorAbstractBase.
      *
      */
+    @Override
     public void init() {
     }
 
@@ -41,6 +45,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * cleanup done by ComponentActivatorAbstractBase
      *
      */
+    @Override
     public void destroy() {
 
     }
@@ -54,6 +59,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * instantiated in order to get an fully working implementation
      * Object
      */
+    @Override
     public Object[] getImplementations() {
         Object[] res = { DijkstraImplementation.class };
         return res;
@@ -72,32 +78,39 @@ public class Activator extends ComponentActivatorAbstractBase {
      * also optional per-container different behavior if needed, usually
      * should not be the case though.
      */
-    public void configureInstance(Component c, Object imp, String containerName) {
+    @Override
+    public void configureInstance(final Component c, final Object imp, final String containerName) {
         if (imp.equals(DijkstraImplementation.class)) {
             // export the service
-            Dictionary<String, String> props = new Hashtable<String, String>();
+            final Dictionary<String, Object> props = new Hashtable<String, Object>();
             props.put("topoListenerName", "routing.Dijkstra");
-            c.setInterface(new String[] { ITopologyManagerAware.class.getName(),
-                    IRouting.class.getName() }, props);
+
+            c.setInterface(new String[] { ITopologyManagerClusterWideAware.class.getName(), IRouting.class.getName() },
+                    props);
 
             // Now lets add a service dependency to make sure the
             // provider of service exists
-            c.add(createContainerServiceDependency(containerName).setService(
-                    IListenRoutingUpdates.class).setCallbacks(
-                    "setListenRoutingUpdates", "unsetListenRoutingUpdates")
+            c.add(createContainerServiceDependency(containerName).setService(IListenRoutingUpdates.class)
+                    .setCallbacks("setListenRoutingUpdates", "unsetListenRoutingUpdates")
                     .setRequired(false));
 
-            c.add(createContainerServiceDependency(containerName).setService(
-                    ISwitchManager.class).setCallbacks("setSwitchManager",
-                    "unsetSwitchManager").setRequired(true));
+            c.add(createContainerServiceDependency(containerName).setService(ISwitchManager.class)
+                    .setCallbacks("setSwitchManager", "unsetSwitchManager")
+                    .setRequired(true));
 
-            c.add(createContainerServiceDependency(containerName).setService(
-                    ITopologyManager.class).setCallbacks("setTopologyManager",
-                    "unsetTopologyManager").setRequired(true));
+            c.add(createContainerServiceDependency(containerName).setService(ITopologyManager.class)
+                    .setCallbacks("setTopologyManager", "unsetTopologyManager")
+                    .setRequired(true));
 
-            c.add(createContainerServiceDependency(containerName).setService(
-                    IReadService.class).setCallbacks("setReadService",
-                    "unsetReadService").setRequired(true));
+            c.add(createContainerServiceDependency(containerName).setService(IClusterContainerServices.class)
+                    .setCallbacks("setClusterContainerService", "unsetClusterContainerService")
+                    .setRequired(true));
         }
     }
+
+    @Override
+    protected Object[] getGlobalImplementations() {
+        final Object[] res = { DijkstraImplementationCLI.class };
+        return res;
+    }
 }
index 3a0faa91d4bbca4018d18022ac2418b2e8fa8804..a8ef381528d5a9fc8f839e40427e4e7f9e9d27de 100644 (file)
@@ -16,6 +16,7 @@
  */
 package org.opendaylight.controller.routing.dijkstra_implementation.internal;
 
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.sal.core.Bandwidth;
 import org.opendaylight.controller.sal.core.ConstructionException;
 import org.opendaylight.controller.sal.core.Edge;
@@ -24,19 +25,18 @@ import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Path;
 import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.UpdateType;
-import org.opendaylight.controller.sal.reader.IReadService;
 import org.opendaylight.controller.sal.routing.IListenRoutingUpdates;
-
 import org.opendaylight.controller.sal.routing.IRouting;
 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
-import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
 
 import edu.uci.ics.jung.algorithms.shortestpath.DijkstraShortestPath;
 import edu.uci.ics.jung.graph.Graph;
 import edu.uci.ics.jung.graph.SparseMultigraph;
 import edu.uci.ics.jung.graph.util.EdgeType;
+
 import java.lang.Exception;
 import java.lang.IllegalArgumentException;
 import java.util.HashSet;
@@ -47,23 +47,24 @@ import java.util.Set;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.collections15.Transformer;
 
-public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
-    private static Logger log = LoggerFactory
-            .getLogger(DijkstraImplementation.class);
+@SuppressWarnings("rawtypes")
+public class DijkstraImplementation implements IRouting, ITopologyManagerClusterWideAware {
+    private static Logger log = LoggerFactory.getLogger(DijkstraImplementation.class);
     private ConcurrentMap<Short, Graph<Node, Edge>> topologyBWAware;
     private ConcurrentMap<Short, DijkstraShortestPath<Node, Edge>> sptBWAware;
     DijkstraShortestPath<Node, Edge> mtp; // Max Throughput Path
     private Set<IListenRoutingUpdates> routingAware;
     private ISwitchManager switchManager;
     private ITopologyManager topologyManager;
-    private IReadService readService;
     private static final long DEFAULT_LINK_SPEED = Bandwidth.BW1Gbps;
+    private IClusterContainerServices clusterContainerService;
 
-    public void setListenRoutingUpdates(IListenRoutingUpdates i) {
+    public void setListenRoutingUpdates(final IListenRoutingUpdates i) {
         if (this.routingAware == null) {
             this.routingAware = new HashSet<IListenRoutingUpdates>();
         }
@@ -73,7 +74,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         }
     }
 
-    public void unsetListenRoutingUpdates(IListenRoutingUpdates i) {
+    public void unsetListenRoutingUpdates(final IListenRoutingUpdates i) {
         if (this.routingAware == null) {
             return;
         }
@@ -95,6 +96,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         Transformer<Edge, ? extends Number> mtTransformer = null;
         if (EdgeWeightMap == null) {
             mtTransformer = new Transformer<Edge, Double>() {
+                @Override
                 public Double transform(Edge e) {
                     if (switchManager == null) {
                         log.error("switchManager is null");
@@ -106,43 +108,39 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
                         log.error("srcNC:{} or dstNC:{} is null", srcNC, dstNC);
                         return (double) -1;
                     }
-                    Bandwidth bwSrc = (Bandwidth) switchManager
-                            .getNodeConnectorProp(srcNC,
-                                    Bandwidth.BandwidthPropName);
-                    Bandwidth bwDst = (Bandwidth) switchManager
-                            .getNodeConnectorProp(dstNC,
-                                    Bandwidth.BandwidthPropName);
+                    Bandwidth bwSrc = (Bandwidth) switchManager.getNodeConnectorProp(srcNC,
+                            Bandwidth.BandwidthPropName);
+                    Bandwidth bwDst = (Bandwidth) switchManager.getNodeConnectorProp(dstNC,
+                            Bandwidth.BandwidthPropName);
 
                     long srcLinkSpeed = 0, dstLinkSpeed = 0;
-                    if ((bwSrc == null)
-                            || ((srcLinkSpeed = bwSrc.getValue()) == 0)) {
-                        log.debug(
-                                "srcNC: {} - Setting srcLinkSpeed to Default!",
-                                srcNC);
+                    if ((bwSrc == null) || ((srcLinkSpeed = bwSrc.getValue()) == 0)) {
+                        log.debug("srcNC: {} - Setting srcLinkSpeed to Default!", srcNC);
                         srcLinkSpeed = DEFAULT_LINK_SPEED;
                     }
 
-                    if ((bwDst == null)
-                            || ((dstLinkSpeed = bwDst.getValue()) == 0)) {
-                        log.debug(
-                                "dstNC: {} - Setting dstLinkSpeed to Default!",
-                                dstNC);
+                    if ((bwDst == null) || ((dstLinkSpeed = bwDst.getValue()) == 0)) {
+                        log.debug("dstNC: {} - Setting dstLinkSpeed to Default!", dstNC);
                         dstLinkSpeed = DEFAULT_LINK_SPEED;
                     }
 
-                    long avlSrcThruPut = srcLinkSpeed
-                            - readService.getTransmitRate(srcNC);
-                    long avlDstThruPut = dstLinkSpeed
-                            - readService.getTransmitRate(dstNC);
-
-                    // Use lower of the 2 available thruput as the available
-                    // thruput
-                    long avlThruPut = avlSrcThruPut < avlDstThruPut ? avlSrcThruPut
-                            : avlDstThruPut;
+                    // TODO: revisit the logic below with the real use case in
+                    // mind
+                    // For now we assume the throughput to be the speed of the
+                    // link itself
+                    // this kind of logic require information that should be
+                    // polled by statistic manager and are not yet available,
+                    // also this service at the moment is not used, so to be
+                    // revisited later on
+                    long avlSrcThruPut = srcLinkSpeed;
+                    long avlDstThruPut = dstLinkSpeed;
+
+                    // Use lower of the 2 available throughput as the available
+                    // throughput
+                    long avlThruPut = avlSrcThruPut < avlDstThruPut ? avlSrcThruPut : avlDstThruPut;
 
                     if (avlThruPut <= 0) {
-                        log.debug("Edge {}: Available Throughput {} <= 0!", e,
-                                avlThruPut);
+                        log.debug("Edge {}: Available Throughput {} <= 0!", e, avlThruPut);
                         return (double) -1;
                     }
                     return (double) (Bandwidth.BW1Pbps / avlThruPut);
@@ -150,6 +148,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
             };
         } else {
             mtTransformer = new Transformer<Edge, Number>() {
+                @Override
                 public Number transform(Edge e) {
                     return EdgeWeightMap.get(e);
                 }
@@ -166,8 +165,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
     }
 
     @Override
-    public Path getRoute(Node src, Node dst) {
-        if (src == null || dst == null) {
+    public Path getRoute(final Node src, final Node dst) {
+        if ((src == null) || (dst == null)) {
             return null;
         }
         return getRoute(src, dst, (short) 0);
@@ -198,10 +197,11 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
     }
 
     @Override
-    public synchronized Path getRoute(Node src, Node dst, Short Bw) {
+    public synchronized Path getRoute(final Node src, final Node dst, final Short Bw) {
         DijkstraShortestPath<Node, Edge> spt = this.sptBWAware.get(Bw);
-        if (spt == null)
+        if (spt == null) {
             return null;
+        }
         List<Edge> path;
         try {
             path = spt.getPath(src, dst);
@@ -238,8 +238,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         }
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private synchronized boolean updateTopo(Edge edge, Short bw, boolean added) {
+    @SuppressWarnings({ "unchecked" })
+    private synchronized boolean updateTopo(Edge edge, Short bw, UpdateType type) {
         Graph<Node, Edge> topo = this.topologyBWAware.get(bw);
         DijkstraShortestPath<Node, Edge> spt = this.sptBWAware.get(bw);
         boolean edgePresentInGraph = false;
@@ -262,7 +262,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
                 this.sptBWAware.put(bw, spt);
             }
 
-            if (added) {
+            switch (type) {
+            case ADDED:
                 // Make sure the vertex are there before adding the edge
                 topo.addVertex(src.getNode());
                 topo.addVertex(dst.getNode());
@@ -270,37 +271,39 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
                 edgePresentInGraph = topo.containsEdge(edge);
                 if (edgePresentInGraph == false) {
                     try {
-                        topo.addEdge(new Edge(src, dst), src.getNode(),
-                                dst.getNode(), EdgeType.DIRECTED);
-                    } catch (ConstructionException e) {
+                        topo.addEdge(new Edge(src, dst), src.getNode(), dst.getNode(), EdgeType.DIRECTED);
+                    } catch (final ConstructionException e) {
                         log.error("", e);
                         return edgePresentInGraph;
                     }
                 }
-            } else {
+            case CHANGED:
+                // Mainly raised only on properties update, so not really useful
+                // in this case
+                break;
+            case REMOVED:
                 // Remove the edge
                 try {
                     topo.removeEdge(new Edge(src, dst));
-                } catch (ConstructionException e) {
+                } catch (final ConstructionException e) {
                     log.error("", e);
                     return edgePresentInGraph;
                 }
 
                 // If the src and dst vertex don't have incoming or
                 // outgoing links we can get ride of them
-                if (topo.containsVertex(src.getNode())
-                        && topo.inDegree(src.getNode()) == 0
-                        && topo.outDegree(src.getNode()) == 0) {
+                if (topo.containsVertex(src.getNode()) && (topo.inDegree(src.getNode()) == 0)
+                        && (topo.outDegree(src.getNode()) == 0)) {
                     log.debug("Removing vertex {}", src);
                     topo.removeVertex(src.getNode());
                 }
 
-                if (topo.containsVertex(dst.getNode())
-                        && topo.inDegree(dst.getNode()) == 0
-                        && topo.outDegree(dst.getNode()) == 0) {
+                if (topo.containsVertex(dst.getNode()) && (topo.inDegree(dst.getNode()) == 0)
+                        && (topo.outDegree(dst.getNode()) == 0)) {
                     log.debug("Removing vertex {}", dst);
                     topo.removeVertex(dst.getNode());
                 }
+                break;
             }
             spt.reset();
             if (bw.equals(baseBW)) {
@@ -312,11 +315,13 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         return edgePresentInGraph;
     }
 
-    private boolean edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
+    private boolean edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean local) {
         String srcType = null;
         String dstType = null;
 
-        if (e == null || type == null) {
+        log.trace("Got an edgeUpdate: {} props: {} update type: {} local: {}", new Object[] { e, props, type, local });
+
+        if ((e == null) || (type == null)) {
             log.error("Edge or Update type are null!");
             return false;
         } else {
@@ -336,21 +341,17 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
 
         Bandwidth bw = new Bandwidth(0);
         boolean newEdge = false;
-        if (props != null)
+        if (props != null) {
             props.remove(bw);
-
-        if (log.isDebugEnabled()) {
-          log.debug("edgeUpdate: {} bw: {}", e, bw.getValue());
         }
 
         Short baseBW = Short.valueOf((short) 0);
-        boolean add = (type == UpdateType.ADDED) ? true : false;
         // Update base topo
-        newEdge = !updateTopo(e, baseBW, add);
+        newEdge = !updateTopo(e, baseBW, type);
         if (newEdge == true) {
             if (bw.getValue() != baseBW) {
                 // Update BW topo
-                updateTopo(e, (short) bw.getValue(), add);
+                updateTopo(e, (short) bw.getValue(), type);
             }
         }
         return newEdge;
@@ -358,16 +359,30 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
 
     @Override
     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+        log.trace("Start of a Bulk EdgeUpdate with " + topoedgeupdateList.size() + " elements");
         boolean callListeners = false;
         for (int i = 0; i < topoedgeupdateList.size(); i++) {
             Edge e = topoedgeupdateList.get(i).getEdge();
-            Set<Property> p = topoedgeupdateList.get(i).getProperty();
-            UpdateType type = topoedgeupdateList.get(i).getUpdateType();
-            if ((edgeUpdate(e, type, p)) && (!callListeners)) {
+            Set<Property> p = topoedgeupdateList.get(i)
+                    .getProperty();
+            UpdateType type = topoedgeupdateList.get(i)
+                    .getUpdateType();
+            boolean isLocal = topoedgeupdateList.get(i)
+                    .isLocal();
+            if ((edgeUpdate(e, type, p, isLocal)) && (!callListeners)) {
                 callListeners = true;
             }
         }
-        if ((callListeners) && (this.routingAware != null)) {
+
+        // The routing listeners should only be called on the coordinator, to
+        // avoid multiple controller cluster nodes to actually do the
+        // recalculation when only one need to react
+        boolean amICoordinator = true;
+        if (this.clusterContainerService != null) {
+            amICoordinator = this.clusterContainerService.amICoordinator();
+        }
+        if ((callListeners) && (this.routingAware != null) && amICoordinator) {
+            log.trace("Calling the routing listeners");
             for (IListenRoutingUpdates ra : this.routingAware) {
                 try {
                     ra.recalculateDone();
@@ -376,6 +391,7 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
                 }
             }
         }
+        log.trace("End of a Bulk EdgeUpdate");
     }
 
     /**
@@ -386,8 +402,8 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public void init() {
         log.debug("Routing init() is called");
-        this.topologyBWAware = (ConcurrentMap<Short, Graph<Node, Edge>>) new ConcurrentHashMap();
-        this.sptBWAware = (ConcurrentMap<Short, DijkstraShortestPath<Node, Edge>>) new ConcurrentHashMap();
+        this.topologyBWAware = new ConcurrentHashMap<Short, Graph<Node, Edge>>();
+        this.sptBWAware = new ConcurrentHashMap<Short, DijkstraShortestPath<Node, Edge>>();
         // Now create the default topology, which doesn't consider the
         // BW, also create the corresponding Dijkstra calculation
         Graph<Node, Edge> g = new SparseMultigraph();
@@ -443,18 +459,6 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         log.debug("Routing stop() is called");
     }
 
-    @Override
-    public void edgeOverUtilized(Edge edge) {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void edgeUtilBackToNormal(Edge edge) {
-        // TODO Auto-generated method stub
-
-    }
-
     public void setSwitchManager(ISwitchManager switchManager) {
         this.switchManager = switchManager;
     }
@@ -465,16 +469,6 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
         }
     }
 
-    public void setReadService(IReadService readService) {
-        this.readService = readService;
-    }
-
-    public void unsetReadService(IReadService readService) {
-        if (this.readService == readService) {
-            this.readService = null;
-        }
-    }
-
     public void setTopologyManager(ITopologyManager tm) {
         this.topologyManager = tm;
     }
@@ -484,4 +478,28 @@ public class DijkstraImplementation implements IRouting, ITopologyManagerAware {
             this.topologyManager = null;
         }
     }
+
+    void setClusterContainerService(IClusterContainerServices s) {
+        log.debug("Cluster Service set");
+        this.clusterContainerService = s;
+    }
+
+    void unsetClusterContainerService(IClusterContainerServices s) {
+        if (this.clusterContainerService == s) {
+            log.debug("Cluster Service removed!");
+            this.clusterContainerService = null;
+        }
+    }
+
+    @Override
+    public void edgeOverUtilized(Edge edge) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void edgeUtilBackToNormal(Edge edge) {
+        // TODO Auto-generated method stub
+
+    }
 }
diff --git a/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementationCLI.java b/opendaylight/routing/dijkstra_implementation/src/main/java/org/opendaylight/controller/routing/dijkstra_implementation/internal/DijkstraImplementationCLI.java
new file mode 100644 (file)
index 0000000..a6645cc
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.routing.dijkstra_implementation.internal;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.felix.service.command.Descriptor;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.core.Path;
+import org.opendaylight.controller.sal.routing.IRouting;
+import org.opendaylight.controller.sal.utils.ServiceHelper;
+import org.osgi.framework.ServiceRegistration;
+
+public class DijkstraImplementationCLI {
+    @SuppressWarnings("rawtypes")
+    private ServiceRegistration sr = null;
+
+    public void init() {
+    }
+
+    public void destroy() {
+    }
+
+    public void start() {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put("osgi.command.scope", "odpcontroller");
+        props.put("osgi.command.function", new String[] { "getRoute" });
+        this.sr = ServiceHelper.registerGlobalServiceWReg(DijkstraImplementationCLI.class, this, props);
+    }
+
+    public void stop() {
+        if (this.sr != null) {
+            this.sr.unregister();
+            this.sr = null;
+        }
+    }
+
+    @Descriptor("Retrieves a Route between two Nodes in the discovered Topology DB")
+    public void getRoute(
+            @Descriptor("Container on the context of which the routing service need to be looked up") String container,
+            @Descriptor("String representation of the Source Node, this need to be consumable from Node.fromString()") String srcNode,
+            @Descriptor("String representation of the Destination Node") String dstNode) {
+        final IRouting r = (IRouting) ServiceHelper.getInstance(IRouting.class, container, this);
+
+        if (r == null) {
+            System.out.println("Cannot find the routing instance on container:" + container);
+            return;
+        }
+
+        final Node src = Node.fromString(srcNode);
+        final Node dst = Node.fromString(dstNode);
+        final Path p = r.getRoute(src, dst);
+        if (p != null) {
+            System.out.println("Route between srcNode:" + src + " and dstNode:" + dst + " = " + p);
+        } else {
+            System.out.println("There is no route between srcNode:" + src + " and dstNode:" + dst);
+        }
+    }
+}
index 58bf350c40e015878cad1ae01f9de3ca9e7f7596..0208cc7cdac4693a2ffafb42a06d54b0e11e6e71 100644 (file)
@@ -17,16 +17,29 @@ import org.opendaylight.controller.sal.core.UpdateType;
 /**
  * The class represents an Edge, the Edge's Property Set and its UpdateType.
  */
-
 public class TopoEdgeUpdate {
     private Edge edge;
     private Set<Property> props;
     private UpdateType type;
+    private boolean isLocal;
 
+    /**
+     * Constructor for a topology element update. A TopologyElementUpdate is an
+     * object that summarize what has happened on an Edge and if the update is
+     * generated locally to this controller or no
+     *
+     * @param e
+     *            Edge being updated
+     * @param p
+     *            Set of Properties attached to the edge
+     * @param t
+     *            Type of update
+     */
     public TopoEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
         edge = e;
         props = p;
         type = t;
+        setLocal(true);
     }
 
     public Edge getEdge() {
@@ -45,39 +58,46 @@ public class TopoEdgeUpdate {
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((edge == null) ? 0 : edge.hashCode());
-        result = prime * result + ((props == null) ? 0 : props.hashCode());
-        result = prime * result + ((type == null) ? 0 : type.hashCode());
+        result = (prime * result) + ((edge == null) ? 0 : edge.hashCode());
+        result = (prime * result) + ((type == null) ? 0 : type.hashCode());
         return result;
     }
 
     @Override
     public String toString() {
-        return "TopoEdgeUpdate [edge=" + edge + ", props=" + props + ", type="
-                + type + "]";
+        return "TopoEdgeUpdate [edge=" + edge + ", props=" + props + ", type=" + type + ", isLocal=" + isLocal + "]";
     }
 
     @Override
     public boolean equals(Object obj) {
-        if (this == obj)
+        if (this == obj) {
             return true;
-        if (obj == null)
+        }
+        if (obj == null) {
             return false;
-        if (getClass() != obj.getClass())
+        }
+        if (getClass() != obj.getClass()) {
             return false;
+        }
         TopoEdgeUpdate other = (TopoEdgeUpdate) obj;
         if (edge == null) {
-            if (other.edge != null)
-                return false;
-        } else if (!edge.equals(other.edge))
-            return false;
-        if (props == null) {
-            if (other.props != null)
+            if (other.edge != null) {
                 return false;
-        } else if (!props.equals(other.props))
+            }
+        } else if (!edge.equals(other.edge)) {
             return false;
-        if (type != other.type)
+        }
+        if (type != other.type) {
             return false;
+        }
         return true;
     }
+
+    public boolean isLocal() {
+        return isLocal;
+    }
+
+    public void setLocal(boolean isLocal) {
+        this.isLocal = isLocal;
+    }
 }
diff --git a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerClusterWideAware.java b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerClusterWideAware.java
new file mode 100644 (file)
index 0000000..aa47b5f
--- /dev/null
@@ -0,0 +1,11 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.topologymanager;
+
+public interface ITopologyManagerClusterWideAware extends ITopologyManagerAware {
+}
index 0da1a2ee39ab73255aca9149095f15ceb6ac8c45..d8ff141edea4da61492cd9e79bbc76d6acede158 100644 (file)
@@ -15,6 +15,7 @@ import java.util.Hashtable;
 import java.util.Set;
 
 import org.apache.felix.dm.Component;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
@@ -22,6 +23,7 @@ import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.ITopologyService;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +37,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * ComponentActivatorAbstractBase.
      *
      */
+    @Override
     public void init() {
 
     }
@@ -44,6 +47,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * cleanup done by ComponentActivatorAbstractBase
      *
      */
+    @Override
     public void destroy() {
 
     }
@@ -57,6 +61,7 @@ public class Activator extends ComponentActivatorAbstractBase {
      * instantiated in order to get an fully working implementation
      * Object
      */
+    @Override
     public Object[] getImplementations() {
         Object[] res = { TopologyManagerImpl.class };
         return res;
@@ -75,17 +80,19 @@ public class Activator extends ComponentActivatorAbstractBase {
      * also optional per-container different behavior if needed, usually
      * should not be the case though.
      */
+    @Override
     public void configureInstance(Component c, Object imp, String containerName) {
         if (imp.equals(TopologyManagerImpl.class)) {
             // export the service needed to listen to topology updates
             Dictionary<String, Set<String>> props = new Hashtable<String, Set<String>>();
             Set<String> propSet = new HashSet<String>();
-            propSet.add("topologymanager.configSaveEvent");
+            propSet.add(TopologyManagerImpl.TOPOEDGESDB);
             props.put("cachenames", propSet);
 
             c.setInterface(new String[] { IListenTopoUpdates.class.getName(),
                     ITopologyManager.class.getName(),
-                    IConfigurationContainerAware.class.getName() }, props);
+                    IConfigurationContainerAware.class.getName(),
+                    ICacheUpdateAware.class.getName() }, props);
 
             c.add(createContainerServiceDependency(containerName).setService(
                     ITopologyService.class).setCallbacks("setTopoService",
@@ -98,6 +105,13 @@ public class Activator extends ComponentActivatorAbstractBase {
                     "setTopologyManagerAware", "unsetTopologyManagerAware")
                     .setRequired(false));
 
+            // These are all the listeners for a topology manager for the
+            // cluster wide events
+            // updates, there could be many or none
+            c.add(createContainerServiceDependency(containerName).setService(ITopologyManagerClusterWideAware.class)
+                    .setCallbacks("setTopologyManagerClusterWideAware", "unsetTopologyManagerClusterWideAware")
+                    .setRequired(false));
+
             c.add(createContainerServiceDependency(containerName).setService(
                     IClusterContainerServices.class).setCallbacks(
                     "setClusterContainerService",
index a043cbe92595ff011483296bc1fc27be6bb7a9ee..4972d3b5b5d73d41df3cfbbb99543db8fde2eb56 100644 (file)
@@ -21,9 +21,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.felix.dm.Component;
@@ -31,6 +33,7 @@ import org.eclipse.osgi.framework.console.CommandInterpreter;
 import org.eclipse.osgi.framework.console.CommandProvider;
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
@@ -52,6 +55,7 @@ import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
@@ -63,9 +67,17 @@ import org.slf4j.LoggerFactory;
  * network topology. It provides service for applications to interact with
  * topology database and notifies all the listeners of topology changes.
  */
-public class TopologyManagerImpl implements ITopologyManager,
-IConfigurationContainerAware, IListenTopoUpdates, IObjectReader,
-CommandProvider {
+public class TopologyManagerImpl implements
+        ICacheUpdateAware,
+        ITopologyManager,
+        IConfigurationContainerAware,
+        IListenTopoUpdates,
+        IObjectReader,
+        CommandProvider {
+    static final String TOPOEDGESDB = "topologymanager.edgesDB";
+    static final String TOPOHOSTSDB = "topologymanager.hostsDB";
+    static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
+    static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
     private static final String SAVE = "Save";
     private ITopologyService topoService;
@@ -79,13 +91,16 @@ CommandProvider {
     // DB of all the NodeConnectors with an Host attached to it
     private ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>> hostsDB;
     // Topology Manager Aware listeners
-    private Set<ITopologyManagerAware> topologyManagerAware =
-            new CopyOnWriteArraySet<ITopologyManagerAware>();;
-
+    private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
+    // Topology Manager Aware listeners - for clusterwide updates
+    private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
+            new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
     private String userLinksFileName;
     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
-    private ConcurrentMap<Long, String> configSaveEvent;
+    private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
+    private volatile Boolean shuttingDown = false;
+    private Thread notifyThread;
 
 
     void nonClusterObjectCreate() {
@@ -93,7 +108,6 @@ CommandProvider {
         hostsDB = new ConcurrentHashMap<NodeConnector, ImmutablePair<Host, Set<Property>>>();
         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
-        configSaveEvent = new ConcurrentHashMap<Long, String>();
     }
 
     void setTopologyManagerAware(ITopologyManagerAware s) {
@@ -110,6 +124,20 @@ CommandProvider {
         }
     }
 
+    void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+        if (this.topologyManagerClusterWideAware != null) {
+            log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
+            this.topologyManagerClusterWideAware.add(s);
+        }
+    }
+
+    void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+        if (this.topologyManagerClusterWideAware != null) {
+            log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
+            this.topologyManagerClusterWideAware.remove(s);
+        }
+    }
+
     void setTopoService(ITopologyService s) {
         log.debug("Adding ITopologyService: {}", s);
         this.topoService = s;
@@ -140,10 +168,8 @@ CommandProvider {
      *
      */
     void init(Component c) {
-
         allocateCaches();
         retrieveCaches();
-
         String containerName = null;
         Dictionary<?, ?> props = c.getServiceProperties();
         if (props != null) {
@@ -156,61 +182,52 @@ CommandProvider {
         userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
         registerWithOSGIConsole();
         loadConfiguration();
+        // Restore the shuttingDown status on init of the component
+        shuttingDown = false;
+        notifyThread = new Thread(new TopologyNotify(notifyQ));
     }
 
     @SuppressWarnings({ "unchecked", "deprecation" })
-    private void allocateCaches(){
-        if (this.clusterContainerService == null) {
-            nonClusterObjectCreate();
-            log.error("Cluster Services unavailable, allocated non-cluster caches!");
-            return;
-        }
-
+    private void allocateCaches() {
         try {
-            this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(
-                    "topologymanager.edgesDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.edgesDB =
+                    (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
+                            EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.edgesDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.edgesDB Cache configuration invalid - check cache mode");
+            log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
-                    .createCache("topologymanager.hostsDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.hostsDB =
+                    (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.createCache(
+                            TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.hostsDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.hostsDB Cache configuration invalid - check cache mode");
+            log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
-                    .createCache("topologymanager.nodeConnectorDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.nodeConnectorsDB =
+                    (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
+                            TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.nodeConnectorDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.nodeConnectorDB Cache configuration invalid - check cache mode");
+            log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.userLinksDB = (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService
-                    .createCache("topologymanager.userLinksDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.userLinksDB =
+                    (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
+                            TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.userLinksDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.userLinksDB Cache configuration invalid - check cache mode");
+            log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
         }
-
-        try {
-            this.configSaveEvent = (ConcurrentMap<Long, String>) this.clusterContainerService
-                    .createCache("topologymanager.configSaveEvent", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
-        } catch (CacheExistException cee) {
-            log.debug("topologymanager.configSaveEvent Cache already exists - destroy and recreate if needed");
-        } catch (CacheConfigException cce) {
-            log.error("topologymanager.configSaveEvent Cache configuration invalid - check cache mode");
-        }
-
     }
 
     @SuppressWarnings({ "unchecked", "deprecation" })
@@ -220,36 +237,28 @@ CommandProvider {
             return;
         }
 
-        this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService
-                .getCache("topologymanager.edgesDB");
+        this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
         if (edgesDB == null) {
-            log.error("Failed to get cache for topologymanager.edgesDB");
+            log.error("Failed to get cache for " + TOPOEDGESDB);
         }
 
-        this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
-                .getCache("topologymanager.hostsDB");
+        this.hostsDB =
+                (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
         if (hostsDB == null) {
-            log.error("Failed to get cache for topologymanager.hostsDB");
+            log.error("Failed to get cache for " + TOPOHOSTSDB);
         }
 
-        this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
-                .getCache("topologymanager.nodeConnectorDB");
+        this.nodeConnectorsDB =
+                (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
         if (nodeConnectorsDB == null) {
-            log.error("Failed to get cache for topologymanager.nodeConnectorDB");
+            log.error("Failed to get cache for " + TOPONODECONNECTORDB);
         }
 
-        this.userLinksDB = (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService
-                .getCache("topologymanager.userLinksDB");
+        this.userLinksDB =
+                (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
         if (userLinksDB == null) {
-            log.error("Failed to get cache for topologymanager.userLinksDB");
+            log.error("Failed to get cache for " + TOPOUSERLINKSDB);
         }
-
-        this.configSaveEvent = (ConcurrentMap<Long, String>) this.clusterContainerService
-                .getCache("topologymanager.configSaveEvent");
-        if (configSaveEvent == null) {
-            log.error("Failed to get cache for topologymanager.configSaveEvent");
-        }
-
     }
 
     /**
@@ -258,12 +267,19 @@ CommandProvider {
      *
      */
     void started() {
+        // Start the batcher thread for the cluster wide topology updates
+        notifyThread.start();
         // SollicitRefresh MUST be called here else if called at init
         // time it may sollicit refresh too soon.
         log.debug("Sollicit topology refresh");
         topoService.sollicitRefresh();
     }
 
+    void stop() {
+        shuttingDown = true;
+        notifyThread.interrupt();
+    }
+
     /**
      * Function called by the dependency manager when at least one dependency
      * become unsatisfied or when the component is shutting down because for
@@ -271,6 +287,8 @@ CommandProvider {
      *
      */
     void destroy() {
+        notifyQ.clear();
+        notifyThread = null;
     }
 
     @SuppressWarnings("unchecked")
@@ -288,8 +306,6 @@ CommandProvider {
 
     @Override
     public Status saveConfig() {
-        // Publish the save config event to the cluster
-        configSaveEvent.put(new Date().getTime(), SAVE );
         return saveConfigInternal();
     }
 
@@ -436,7 +452,7 @@ CommandProvider {
     @Override
     public Host getHostAttachedToNodeConnector(NodeConnector port) {
         ImmutablePair<Host, Set<Property>> host;
-        if (this.hostsDB == null || (host = this.hostsDB.get(port)) == null) {
+        if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) {
             return null;
         }
         return host.getLeft();
@@ -674,7 +690,7 @@ CommandProvider {
 
         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
         Edge linkTuple;
-        if (link != null && (linkTuple = getLinkTuple(link)) != null) {
+        if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
             if (! isProductionLink(linkTuple)) {
                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
             }
@@ -799,4 +815,92 @@ CommandProvider {
         log.warn("Link Utilization back to normal: {}", edge);
     }
 
+    private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
+        TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
+        upd.setLocal(isLocal);
+        notifyQ.add(upd);
+    }
+
+    @Override
+    public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            // This is the case of an Edge being added to the topology DB
+            final Edge e = (Edge) key;
+            log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
+            edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
+        }
+    }
+
+    @Override
+    public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            final Edge e = (Edge) key;
+            log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+            final Set<Property> props = (Set<Property>) new_value;
+            edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
+        }
+    }
+
+    @Override
+    public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            final Edge e = (Edge) key;
+            log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
+            edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
+        }
+    }
+
+    class TopologyNotify implements Runnable {
+        private final BlockingQueue<TopoEdgeUpdate> notifyQ;
+        private TopoEdgeUpdate entry;
+        private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+        private boolean notifyListeners;
+
+        TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
+            this.notifyQ = notifyQ;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    log.trace("New run of TopologyNotify");
+                    notifyListeners = false;
+                    // First we block waiting for an element to get in
+                    entry = notifyQ.take();
+                    // Then we drain the whole queue if elements are
+                    // in it without getting into any blocking condition
+                    for (; entry != null; entry = notifyQ.poll()) {
+                        teuList.add(entry);
+                        notifyListeners = true;
+                    }
+
+                    // Notify listeners only if there were updates drained else
+                    // give up
+                    if (notifyListeners) {
+                        log.trace("Notifier thread, notified a listener");
+                        // Now update the listeners
+                        for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
+                            try {
+                                s.edgeUpdate(teuList);
+                            } catch (Exception exc) {
+                                log.error("Exception on edge update:", exc);
+                            }
+                        }
+                    }
+                    teuList.clear();
+
+                    // Lets sleep for sometime to allow aggregation of event
+                    Thread.sleep(100);
+                } catch (InterruptedException e1) {
+                    log.warn("TopologyNotify interrupted {}", e1.getMessage());
+                    if (shuttingDown) {
+                        return;
+                    }
+                } catch (Exception e2) {
+                    log.error("", e2);
+                }
+            }
+        }
+    }
 }