Adding code to handle multiple hosts per node connector
[controller.git] / opendaylight / topologymanager / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
index 9d24cc6fa89994a7014b15458010631e6ebb74f5..4bd295c62a2bc641ca877136b26992222ef39e4e 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -12,16 +11,21 @@ package org.opendaylight.controller.topologymanager.internal;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.ObjectInputStream;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.felix.dm.Component;
@@ -29,10 +33,10 @@ import org.eclipse.osgi.framework.console.CommandInterpreter;
 import org.eclipse.osgi.framework.console.CommandProvider;
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
-import org.opendaylight.controller.sal.core.ConstructionException;
 import org.opendaylight.controller.sal.core.Edge;
 import org.opendaylight.controller.sal.core.Host;
 import org.opendaylight.controller.sal.core.Node;
@@ -42,15 +46,16 @@ import org.opendaylight.controller.sal.core.TimeStamp;
 import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.ITopologyService;
-import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
 import org.opendaylight.controller.sal.utils.GlobalConstants;
-import org.opendaylight.controller.sal.utils.HexEncode;
 import org.opendaylight.controller.sal.utils.IObjectReader;
 import org.opendaylight.controller.sal.utils.ObjectReader;
 import org.opendaylight.controller.sal.utils.ObjectWriter;
 import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
@@ -62,55 +67,85 @@ import org.slf4j.LoggerFactory;
  * network topology. It provides service for applications to interact with
  * topology database and notifies all the listeners of topology changes.
  */
-public class TopologyManagerImpl implements ITopologyManager,
-        IConfigurationContainerAware, IListenTopoUpdates, IObjectReader,
+public class TopologyManagerImpl implements
+        ICacheUpdateAware,
+        ITopologyManager,
+        IConfigurationContainerAware,
+        IListenTopoUpdates,
+        IObjectReader,
         CommandProvider {
-    private static final Logger log = LoggerFactory
-            .getLogger(TopologyManagerImpl.class);
-    private ITopologyService topoService = null;
-    private IClusterContainerServices clusterContainerService = null;
+    static final String TOPOEDGESDB = "topologymanager.edgesDB";
+    static final String TOPOHOSTSDB = "topologymanager.hostsDB";
+    static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
+    static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
+    private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
+    private static final String SAVE = "Save";
+    private ITopologyService topoService;
+    private IClusterContainerServices clusterContainerService;
     // DB of all the Edges with properties which constitute our topology
-    private ConcurrentMap<Edge, Set<Property>> edgesDB = null;
-    // DB of all NodeConnector which are part of Edges, meaning they
-    // are connected to another NodeConnector on the other side
-    private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB = null;
+    private ConcurrentMap<Edge, Set<Property>> edgesDB;
+    // DB of all NodeConnector which are part of ISL Edges, meaning they
+    // are connected to another NodeConnector on the other side of an ISL link.
+    // NodeConnector of a Production Edge is not part of this DB.
+    private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
     // DB of all the NodeConnectors with an Host attached to it
-    private ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>> hostsDB = null;
+    private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
     // Topology Manager Aware listeners
-    private Set<ITopologyManagerAware> topologyManagerAware = Collections
-            .synchronizedSet(new HashSet<ITopologyManagerAware>());
-
+    private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
+    // Topology Manager Aware listeners - for clusterwide updates
+    private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
+            new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
-    private String userLinksFileName = null;
-    private ConcurrentMap<String, TopologyUserLinkConfig> userLinks;
-    
+    private String userLinksFileName;
+    private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
+    private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
+    private volatile Boolean shuttingDown = false;
+    private Thread notifyThread;
+
+
     void nonClusterObjectCreate() {
-       edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
-       hostsDB = new ConcurrentHashMap<NodeConnector, ImmutablePair<Host, Set<Property>>>();
-       userLinks = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
-       nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
+        edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
+        hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
+        nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
+        userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
     }
-    
 
     void setTopologyManagerAware(ITopologyManagerAware s) {
         if (this.topologyManagerAware != null) {
-               log.debug("Adding ITopologyManagerAware: " + s);
+            log.debug("Adding ITopologyManagerAware: {}", s);
             this.topologyManagerAware.add(s);
         }
     }
 
     void unsetTopologyManagerAware(ITopologyManagerAware s) {
         if (this.topologyManagerAware != null) {
+            log.debug("Removing ITopologyManagerAware: {}", s);
             this.topologyManagerAware.remove(s);
         }
     }
 
+    void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+        if (this.topologyManagerClusterWideAware != null) {
+            log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
+            this.topologyManagerClusterWideAware.add(s);
+        }
+    }
+
+    void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+        if (this.topologyManagerClusterWideAware != null) {
+            log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
+            this.topologyManagerClusterWideAware.remove(s);
+        }
+    }
+
     void setTopoService(ITopologyService s) {
+        log.debug("Adding ITopologyService: {}", s);
         this.topoService = s;
     }
 
     void unsetTopoService(ITopologyService s) {
         if (this.topoService == s) {
+            log.debug("Removing ITopologyService: {}", s);
             this.topoService = null;
         }
     }
@@ -133,8 +168,10 @@ public class TopologyManagerImpl implements ITopologyManager,
      *
      */
     void init(Component c) {
+        allocateCaches();
+        retrieveCaches();
         String containerName = null;
-        Dictionary props = c.getServiceProperties();
+        Dictionary<?, ?> props = c.getServiceProperties();
         if (props != null) {
             containerName = (String) props.get("containerName");
         } else {
@@ -142,134 +179,146 @@ public class TopologyManagerImpl implements ITopologyManager,
             containerName = "UNKNOWN";
         }
 
-        if (this.clusterContainerService == null) {
-            log.error("Cluster Services is null, not expected!");
-            return;
-        }
+        userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
+        registerWithOSGIConsole();
+        loadConfiguration();
+        // Restore the shuttingDown status on init of the component
+        shuttingDown = false;
+        notifyThread = new Thread(new TopologyNotify(notifyQ));
+    }
 
-        if (this.topoService == null) {
-            log.error("Topology Services is null, not expected!");
-            return;
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private void allocateCaches() {
+        try {
+            this.edgesDB =
+                    (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
+                            EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+        } catch (CacheExistException cee) {
+            log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
+        } catch (CacheConfigException cce) {
+            log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService
-                    .createCache("topologymanager.edgesDB", EnumSet
-                            .of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.hostsDB =
+                    (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.createCache(
+                            TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.error("topologymanager.edgesDB Cache already exists - "
-                    + "destroy and recreate if needed");
+            log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.edgesDB Cache configuration invalid - "
-                    + "check cache mode");
+            log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
-                    .createCache("topologymanager.hostsDB", EnumSet
-                            .of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.nodeConnectorsDB =
+                    (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
+                            TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.error("topologymanager.hostsDB Cache already exists - "
-                    + "destroy and recreate if needed");
+            log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.hostsDB Cache configuration invalid - "
-                    + "check cache mode");
+            log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
-                    .createCache("topologymanager.nodeConnectorDB", EnumSet
-                            .of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.userLinksDB =
+                    (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
+                            TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.error("topologymanager.nodeConnectorDB Cache already exists"
-                    + " - destroy and recreate if needed");
+            log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.nodeConnectorDB Cache configuration "
-                    + "invalid - check cache mode");
+            log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
         }
+    }
 
-        userLinks = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private void retrieveCaches() {
+        if (this.clusterContainerService == null) {
+            log.error("Cluster Services is null, can't retrieve caches.");
+            return;
+        }
 
-        userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
-        registerWithOSGIConsole();
-        loadConfiguration();
+        this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
+        if (edgesDB == null) {
+            log.error("Failed to get cache for " + TOPOEDGESDB);
+        }
+
+        this.hostsDB =
+                (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
+        if (hostsDB == null) {
+            log.error("Failed to get cache for " + TOPOHOSTSDB);
+        }
+
+        this.nodeConnectorsDB =
+                (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
+        if (nodeConnectorsDB == null) {
+            log.error("Failed to get cache for " + TOPONODECONNECTORDB);
+        }
+
+        this.userLinksDB =
+                (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
+        if (userLinksDB == null) {
+            log.error("Failed to get cache for " + TOPOUSERLINKSDB);
+        }
     }
 
     /**
-     * Function called after the topology manager has registered the
-     * service in OSGi service registry.
+     * Function called after the topology manager has registered the service in
+     * OSGi service registry.
      *
      */
     void started() {
+        // Start the batcher thread for the cluster wide topology updates
+        notifyThread.start();
         // SollicitRefresh MUST be called here else if called at init
         // time it may sollicit refresh too soon.
         log.debug("Sollicit topology refresh");
         topoService.sollicitRefresh();
     }
 
+    void stop() {
+        shuttingDown = true;
+        notifyThread.interrupt();
+    }
+
     /**
-     * Function called by the dependency manager when at least one
-     * dependency become unsatisfied or when the component is shutting
-     * down because for example bundle is being stopped.
+     * Function called by the dependency manager when at least one dependency
+     * become unsatisfied or when the component is shutting down because for
+     * example bundle is being stopped.
      *
      */
     void destroy() {
-        if (this.clusterContainerService == null) {
-            log.error("Cluster Services is null, not expected!");
-            this.edgesDB = null;
-            this.hostsDB = null;
-            this.nodeConnectorsDB = null;
-            return;
-        }
-        this.clusterContainerService.destroyCache("topologymanager.edgesDB");
-        this.edgesDB = null;
-        this.clusterContainerService.destroyCache("topologymanager.hostsDB");
-        this.hostsDB = null;
-        this.clusterContainerService
-                .destroyCache("topologymanager.nodeConnectorDB");
-        this.nodeConnectorsDB = null;
-        log.debug("Topology Manager DB DE-allocated");
+        notifyQ.clear();
+        notifyThread = null;
     }
 
     @SuppressWarnings("unchecked")
     private void loadConfiguration() {
         ObjectReader objReader = new ObjectReader();
-        ConcurrentMap<String, TopologyUserLinkConfig> confList = (ConcurrentMap<String, TopologyUserLinkConfig>) objReader
-                .read(this, userLinksFileName);
+        ConcurrentMap<String, TopologyUserLinkConfig> confList =
+                (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
 
-        if (confList == null) {
-            return;
-        }
-
-        for (TopologyUserLinkConfig conf : confList.values()) {
-            addUserLink(conf);
+        if (confList != null) {
+            for (TopologyUserLinkConfig conf : confList.values()) {
+                addUserLink(conf);
+            }
         }
     }
 
     @Override
     public Status saveConfig() {
-        // Publish the save config event to the cluster nodes
-        /**
-         * Get the CLUSTERING SERVICES WORKING BEFORE TRYING THIS
-
-        configSaveEvent.put(new Date().getTime(), SAVE);
-         */
         return saveConfigInternal();
     }
 
     public Status saveConfigInternal() {
-       Status retS;
         ObjectWriter objWriter = new ObjectWriter();
 
-        retS = objWriter.write(
-                new ConcurrentHashMap<String, TopologyUserLinkConfig>(
-                        userLinks), userLinksFileName);
+        Status saveStatus = objWriter.write(
+                new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
 
-        if (retS.isSuccess()) {
-            return retS;
-        } else {
-            return new Status(StatusCode.INTERNALERROR, "Save failed");
+        if (! saveStatus.isSuccess()) {
+            return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
         }
+        return saveStatus;
     }
 
     @Override
@@ -278,31 +327,25 @@ public class TopologyManagerImpl implements ITopologyManager,
             return null;
         }
 
-        HashMap<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
-        for (Edge key : this.edgesDB.keySet()) {
+        Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
+        for (Edge edge : this.edgesDB.keySet()) {
             // Lets analyze the tail
-            Node node = key.getTailNodeConnector().getNode();
+            Node node = edge.getTailNodeConnector().getNode();
             Set<Edge> nodeEdges = res.get(node);
             if (nodeEdges == null) {
                 nodeEdges = new HashSet<Edge>();
+                res.put(node, nodeEdges);
             }
-            nodeEdges.add(key);
-            // We need to re-add to the MAP even if the element was
-            // already there so in case of clustered services the map
-            // gets updated in the cluster
-            res.put(node, nodeEdges);
+            nodeEdges.add(edge);
 
             // Lets analyze the head
-            node = key.getHeadNodeConnector().getNode();
+            node = edge.getHeadNodeConnector().getNode();
             nodeEdges = res.get(node);
             if (nodeEdges == null) {
                 nodeEdges = new HashSet<Edge>();
+                res.put(node, nodeEdges);
             }
-            nodeEdges.add(key);
-            // We need to re-add to the MAP even if the element was
-            // already there so in case of clustered services the map
-            // gets updated in the cluster
-            res.put(node, nodeEdges);
+            nodeEdges.add(edge);
         }
 
         return res;
@@ -320,11 +363,34 @@ public class TopologyManagerImpl implements ITopologyManager,
     }
 
     /**
-     * The Map returned is a copy of the current topology hence if the
-     * topology changes the copy doesn't
+     * This method returns true if the edge is an ISL link.
+     *
+     * @param e
+     *            The edge
+     * @return true if it is an ISL link
+     */
+    public boolean isISLink(Edge e) {
+        return (!isProductionLink(e));
+    }
+
+    /**
+     * This method returns true if the edge is a production link.
      *
-     * @return A Map representing the current topology expressed as
-     * edges of the network
+     * @param e
+     *            The edge
+     * @return true if it is a production link
+     */
+    public boolean isProductionLink(Edge e) {
+        return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
+                || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
+    }
+
+    /**
+     * The Map returned is a copy of the current topology hence if the topology
+     * changes the copy doesn't
+     *
+     * @return A Map representing the current topology expressed as edges of the
+     *         network
      */
     @Override
     public Map<Edge, Set<Property>> getEdges() {
@@ -332,30 +398,20 @@ public class TopologyManagerImpl implements ITopologyManager,
             return null;
         }
 
-        HashMap<Edge, Set<Property>> res = new HashMap<Edge, Set<Property>>();
-        for (Edge key : this.edgesDB.keySet()) {
+        Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
+        Set<Property> props;
+        for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
             // Sets of props are copied because the composition of
             // those properties could change with time
-            HashSet<Property> prop = new HashSet<Property>(this.edgesDB
-                    .get(key));
+            props = new HashSet<Property>(edgeEntry.getValue());
             // We can simply reuse the key because the object is
             // immutable so doesn't really matter that we are
             // referencing the only owned by a different table, the
             // meaning is the same because doesn't change with time.
-            res.put(key, prop);
+            edgeMap.put(edgeEntry.getKey(), props);
         }
 
-        return res;
-    }
-
-    // TODO remove with spring-dm removal
-    /**
-     * @param set the topologyAware to set
-     */
-    public void setTopologyAware(Set<Object> set) {
-        for (Object s : set) {
-            setTopologyManagerAware((ITopologyManagerAware) s);
-        }
+        return edgeMap;
     }
 
     @Override
@@ -364,7 +420,7 @@ public class TopologyManagerImpl implements ITopologyManager,
             return null;
         }
 
-        return (this.hostsDB.keySet());
+        return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
     }
 
     @Override
@@ -373,88 +429,117 @@ public class TopologyManagerImpl implements ITopologyManager,
             return null;
         }
         HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
-
-        for (NodeConnector p : this.hostsDB.keySet()) {
-            Node n = p.getNode();
-            Set<NodeConnector> pSet = res.get(n);
-            if (pSet == null) {
+        Node node;
+        Set<NodeConnector> portSet;
+        for (NodeConnector nc : this.hostsDB.keySet()) {
+            node = nc.getNode();
+            portSet = res.get(node);
+            if (portSet == null) {
                 // Create the HashSet if null
-                pSet = new HashSet<NodeConnector>();
-                res.put(n, pSet);
+                portSet = new HashSet<NodeConnector>();
+                res.put(node, portSet);
             }
 
             // Keep updating the HashSet, given this is not a
             // clustered map we can just update the set without
             // worrying to update the hashmap.
-            pSet.add(p);
+            portSet.add(nc);
         }
 
         return (res);
     }
 
     @Override
-    public Host getHostAttachedToNodeConnector(NodeConnector p) {
-        if (this.hostsDB == null) {
-            return null;
+    public Host getHostAttachedToNodeConnector(NodeConnector port) {
+        List<Host> hosts = getHostsAttachedToNodeConnector(port);
+        if(hosts != null && !hosts.isEmpty()){
+            return hosts.get(0);
         }
+        return null;
+    }
 
-        return (this.hostsDB.get(p).getLeft());
+    @Override
+    public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
+        Set<ImmutablePair<Host, Set<Property>>> hosts;
+        if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
+            return null;
+        }
+        // create a list of hosts
+        List<Host> retHosts = new LinkedList<Host>();
+        for(ImmutablePair<Host, Set<Property>> host : hosts) {
+            retHosts.add(host.getLeft());
+        }
+        return retHosts;
     }
 
     @Override
-    public void updateHostLink(NodeConnector p, Host h, UpdateType t,
-            Set<Property> props) {
-        if (this.hostsDB == null) {
-            return;
+    public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
+
+        // Clone the property set in case non null else just
+        // create an empty one. Caches allocated via infinispan
+        // don't allow null values
+        if (props == null) {
+            props = new HashSet<Property>();
+        } else {
+            props = new HashSet<Property>(props);
         }
+        ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
 
+        // get the host list
+        Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
+        if(hostSet == null) {
+            hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
+        }
         switch (t) {
         case ADDED:
         case CHANGED:
-            // Clone the property set in case non null else just
-            // create an empty one. Caches allocated via infinispan
-            // don't allow null values
-            if (props == null) {
-                props = new HashSet<Property>();
-            } else {
-                props = new HashSet<Property>(props);
-            }
-
-            this.hostsDB.put(p, new ImmutablePair(h, props));
+            hostSet.add(thisHost);
+            this.hostsDB.put(port, hostSet);
             break;
         case REMOVED:
-            this.hostsDB.remove(p);
+            hostSet.remove(thisHost);
+            if(hostSet.isEmpty()) {
+                //remove only if hasn't been concurrently modified
+                this.hostsDB.remove(port, hostSet);
+            } else {
+                this.hostsDB.put(port, hostSet);
+            }
             break;
         }
     }
 
-    @Override
-    public void edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
+    private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
         switch (type) {
         case ADDED:
             // Make sure the props are non-null
             if (props == null) {
-                props = (Set<Property>) new HashSet();
+                props = new HashSet<Property>();
             } else {
-                // Copy the set so noone is going to change the content
-                props = (Set<Property>) new HashSet(props);
+                props = new HashSet<Property>(props);
+            }
+
+            //in case of node switch-over to a different cluster controller,
+            //let's retain edge props
+            Set<Property> currentProps = this.edgesDB.get(e);
+            if (currentProps != null){
+                props.addAll(currentProps);
             }
 
-            // Now make sure thre is the creation timestamp for the
-            // edge, if not there timestamp with the first update
+            // Now make sure there is the creation timestamp for the
+            // edge, if not therestamp with the first update
             boolean found_create = false;
             for (Property prop : props) {
                 if (prop instanceof TimeStamp) {
                     TimeStamp t = (TimeStamp) prop;
                     if (t.getTimeStampName().equals("creation")) {
                         found_create = true;
+                        break;
                     }
                 }
             }
 
             if (!found_create) {
-                TimeStamp t = new TimeStamp(System.currentTimeMillis(),
-                        "creation");
+                TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
                 props.add(t);
             }
 
@@ -463,12 +548,14 @@ public class TopologyManagerImpl implements ITopologyManager,
             this.edgesDB.put(e, props);
 
             // Now populate the DB of NodeConnectors
-            // NOTE WELL: properties are empy sets, not really needed
+            // NOTE WELL: properties are empty sets, not really needed
             // for now.
-            this.nodeConnectorsDB.put(e.getHeadNodeConnector(),
-                    new HashSet<Property>());
-            this.nodeConnectorsDB.put(e.getTailNodeConnector(),
-                    new HashSet<Property>());
+            // The DB only contains ISL ports
+            if (isISLink(e)) {
+                this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
+                this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
+            }
+            log.trace("Edge {}  {}", e.toString(), type.name());
             break;
         case REMOVED:
             // Now remove the edge from edgesDB
@@ -483,31 +570,32 @@ public class TopologyManagerImpl implements ITopologyManager,
             // should be safe to assume that won't happen.
             this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
             this.nodeConnectorsDB.remove(e.getTailNodeConnector());
+            log.trace("Edge {}  {}", e.toString(), type.name());
             break;
         case CHANGED:
-            Set<Property> old_props = this.edgesDB.get(e);
+            Set<Property> oldProps = this.edgesDB.get(e);
 
             // When property changes lets make sure we can change it
             // all except the creation time stamp because that should
             // be changed only when the edge is destroyed and created
             // again
-            TimeStamp tc = null;
-            for (Property prop : old_props) {
+            TimeStamp timeStamp = null;
+            for (Property prop : oldProps) {
                 if (prop instanceof TimeStamp) {
-                    TimeStamp t = (TimeStamp) prop;
-                    if (t.getTimeStampName().equals("creation")) {
-                        tc = t;
+                    TimeStamp tsProp = (TimeStamp) prop;
+                    if (tsProp.getTimeStampName().equals("creation")) {
+                        timeStamp = tsProp;
+                        break;
                     }
                 }
             }
 
-            // Now lest make sure new properties are non-null
-            // Make sure the props are non-null
+            // Now lets make sure new properties are non-null
             if (props == null) {
-                props = (Set<Property>) new HashSet();
+                props = new HashSet<Property>();
             } else {
                 // Copy the set so noone is going to change the content
-                props = (Set<Property>) new HashSet(props);
+                props = new HashSet<Property>(props);
             }
 
             // Now lets remove the creation property if exist in the
@@ -518,198 +606,126 @@ public class TopologyManagerImpl implements ITopologyManager,
                     TimeStamp t = (TimeStamp) prop;
                     if (t.getTimeStampName().equals("creation")) {
                         i.remove();
+                        break;
                     }
                 }
             }
 
             // Now lets add the creation timestamp in it
-            if (tc != null) {
-                props.add(tc);
+            if (timeStamp != null) {
+                props.add(timeStamp);
             }
 
             // Finally update
             this.edgesDB.put(e, props);
+            log.trace("Edge {}  {}", e.toString(), type.name());
             break;
         }
+        return new TopoEdgeUpdate(e, props, type);
+    }
+
+    @Override
+    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+        List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+        for (int i = 0; i < topoedgeupdateList.size(); i++) {
+            Edge e = topoedgeupdateList.get(i).getEdge();
+            Set<Property> p = topoedgeupdateList.get(i).getProperty();
+            UpdateType type = topoedgeupdateList.get(i).getUpdateType();
+            TopoEdgeUpdate teu = edgeUpdate(e, type, p);
+            teuList.add(teu);
+        }
 
         // Now update the listeners
         for (ITopologyManagerAware s : this.topologyManagerAware) {
             try {
-                s.edgeUpdate(e, type, props);
+                s.edgeUpdate(teuList);
             } catch (Exception exc) {
-                log.error("Exception on callback", exc);
+                log.error("Exception on edge update:", exc);
             }
         }
+
     }
 
     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
-        TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(link
-                .getName(), link.getDstSwitchId(), link.getDstPort(), link
-                .getSrcSwitchId(), link.getSrcPort());
+        TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
+                link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
         return getLinkTuple(rLink);
     }
 
-    private Edge getLinkTuple(TopologyUserLinkConfig link) {
-        Edge linkTuple = null;
-        Long sID = link.getSrcSwitchIDLong();
-        Long dID = link.getDstSwitchIDLong();
-        Short srcPort = Short.valueOf((short) 0);
-        Short dstPort = Short.valueOf((short) 0);
-        if (link.isSrcPortByName()) {
-            // TODO find the inventory service to do this, for now 0
-            //srcPort = srcSw.getPortNumber(link.getSrcPort());
-        } else {
-            srcPort = Short.parseShort(link.getSrcPort());
-        }
-
-        if (link.isDstPortByName()) {
-            //dstPort = dstSw.getPortNumber(link.getDstPort());;
-        } else {
-            dstPort = Short.parseShort(link.getDstPort());
-        }
-
-        // if atleast 1 link exists for the srcPort and atleast 1 link exists for the dstPort
-        // that makes it ineligible for the Manual link addition
-        // This is just an extra protection to avoid mis-programming.
-        boolean srcLinkExists = false;
-        boolean dstLinkExists = false;
-        /**
-         * Disabling this optimization for now to understand the real benefit of doing this.
-         * Since this is a Manual Link addition, the user knows what he is doing and it is
-         * not good to restrict such creativity...
-         */
-        /*
-          Set <Edge> links = oneTopology.getLinks().keySet();
-          if (links != null) {
-          for (Edge eLink : links) {
-          if (!eLink.isUserCreated() &&
-          eLink.getSrc().getSid().equals(link.getSrcSwitchIDLong()) &&
-          eLink.getSrc().getPort().equals(srcPort)) {
-          srcLinkExists = true;
-          }
-
-          if (!eLink.isUserCreated() &&
-          eLink.getSrc().getSid().equals(link.getSrcSwitchIDLong()) &&
-          eLink.getSrc().getPort().equals(srcPort)) {
-          dstLinkExists = true;
-          }
-
-          if (!eLink.isUserCreated() &&
-          eLink.getDst().getSid().equals(link.getSrcSwitchIDLong()) &&
-          eLink.getDst().getPort().equals(srcPort)) {
-          srcLinkExists = true;
-          }
-
-          if (!eLink.isUserCreated() &&
-          eLink.getDst().getSid().equals(link.getSrcSwitchIDLong()) &&
-          eLink.getDst().getPort().equals(srcPort)) {
-          dstLinkExists = true;
-          }
-          }
-          }
-         */
-        //TODO check a way to validate the port with inventory services
-        //if (srcSw.getPorts().contains(srcPort) &&
-        //dstSw.getPorts().contains(srcPort) &&
-        if (!srcLinkExists && !dstLinkExists) {
-            Node sNode = null;
-            Node dNode = null;
-            NodeConnector sPort = null;
-            NodeConnector dPort = null;
-            linkTuple = null;
-            try {
-                sNode = new Node(Node.NodeIDType.OPENFLOW, sID);
-                dNode = new Node(Node.NodeIDType.OPENFLOW, dID);
-                sPort = new NodeConnector(
-                        NodeConnector.NodeConnectorIDType.OPENFLOW, srcPort,
-                        sNode);
-                dPort = new NodeConnector(
-                        NodeConnector.NodeConnectorIDType.OPENFLOW, dstPort,
-                        dNode);
-                linkTuple = new Edge(sPort, dPort);
-            } catch (ConstructionException cex) {
-            }
-            return linkTuple;
-        }
 
-        if (srcLinkExists && dstLinkExists) {
-            link.setStatus(TopologyUserLinkConfig.STATUS.INCORRECT);
+    private Edge getLinkTuple(TopologyUserLinkConfig link) {
+        NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
+        NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
+        try {
+            return new Edge(srcNodeConnector, dstNodeConnector);
+        } catch (Exception e) {
+            return null;
         }
-        return null;
     }
 
     @Override
     public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
-        return userLinks;
+        return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
     }
 
     @Override
-    public Status addUserLink(TopologyUserLinkConfig link) {
-        if (!link.isValid()) {
-            return new Status(StatusCode.BADREQUEST, 
-                       "Configuration Invalid. Please check the parameters");
-        }
-        if (userLinks.get(link.getName()) != null) {
-            return new Status(StatusCode.CONFLICT, 
-                       "Link with name : " + link.getName()
-                    + " already exists. Please use another name");
+    public Status addUserLink(TopologyUserLinkConfig userLink) {
+        if (!userLink.isValid()) {
+            return new Status(StatusCode.BADREQUEST,
+                    "User link configuration invalid.");
+        }
+        userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
+
+        //Check if this link already configured
+        //NOTE: infinispan cache doesn't support Map.containsValue()
+        // (which is linear time in most ConcurrentMap impl anyway)
+        for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
+            if (existingLink.equals(userLink)) {
+                return new Status(StatusCode.CONFLICT, "Link configuration exists");
+            }
         }
-        if (userLinks.containsValue(link)) {
-            return new Status(StatusCode.CONFLICT, "Link configuration exists");
+        //attempt put, if mapping for this key already existed return conflict
+        if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
+            return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
+                    + " already exists. Please use another name");
         }
 
-        link.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
-        userLinks.put(link.getName(), link);
-
-        Edge linkTuple = getLinkTuple(link);
+        Edge linkTuple = getLinkTuple(userLink);
         if (linkTuple != null) {
-            try {
-                // TODO The onetopology will be gone too, topology
-                //manager is the master of the topology at this point
-                //if (oneTopology.addUserConfiguredLink(linkTuple)) {
-                linkTuple = getReverseLinkTuple(link);
-                //if (oneTopology.addUserConfiguredLink(linkTuple)) {
-                link.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
-                //}
-                //}
-            } catch (Exception e) {
-                return new Status(StatusCode.INTERNALERROR,
-                               "Exception while adding custom link : " + 
-                                               e.getMessage());
+            if (!isProductionLink(linkTuple)) {
+                edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
+            }
+
+            linkTuple = getReverseLinkTuple(userLink);
+            if (linkTuple != null) {
+                userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
+                if (!isProductionLink(linkTuple)) {
+                    edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
+                }
             }
         }
-        return new Status(StatusCode.SUCCESS, null);
+        return new Status(StatusCode.SUCCESS);
     }
 
     @Override
     public Status deleteUserLink(String linkName) {
         if (linkName == null) {
-            return new Status(StatusCode.BADREQUEST, 
-                       "A valid linkName is required to Delete a link");
+            return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
         }
 
-        TopologyUserLinkConfig link = userLinks.get(linkName);
-
-        Edge linkTuple = getLinkTuple(link);
-        userLinks.remove(linkName);
-        if (linkTuple != null) {
-            try {
-                //oneTopology.deleteUserConfiguredLink(linkTuple);
-            } catch (Exception e) {
-                log
-                        .warn("Harmless : Exception while Deleting User Configured link "
-                                + link + " " + e.toString());
+        TopologyUserLinkConfig link = userLinksDB.remove(linkName);
+        Edge linkTuple;
+        if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
+            if (! isProductionLink(linkTuple)) {
+                edgeUpdate(linkTuple, UpdateType.REMOVED, null);
             }
+
             linkTuple = getReverseLinkTuple(link);
-            try {
-                //oneTopology.deleteUserConfiguredLink(linkTuple);
-            } catch (Exception e) {
-                log
-                        .error("Harmless : Exception while Deleting User Configured Reverse link "
-                                + link + " " + e.toString());
+            if (! isProductionLink(linkTuple)) {
+                edgeUpdate(linkTuple, UpdateType.REMOVED, null);
             }
         }
-        return new Status(StatusCode.SUCCESS, null);
+        return new Status(StatusCode.SUCCESS);
     }
 
     private void registerWithOSGIConsole() {
@@ -723,67 +739,57 @@ public class TopologyManagerImpl implements ITopologyManager,
     public String getHelp() {
         StringBuffer help = new StringBuffer();
         help.append("---Topology Manager---\n");
-        help
-                .append("\t addTopo name <src-sw-id> <port-number> <dst-sw-id> <port-number>\n");
-        help.append("\t delTopo name\n");
-        help.append("\t _printTopo\n");
+        help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
+        help.append("\t deleteUserLink <name>\n");
+        help.append("\t printUserLink\n");
+        help.append("\t printNodeEdges\n");
         return help.toString();
     }
 
-    public void _printTopo(CommandInterpreter ci) {
-        for (String name : this.userLinks.keySet()) {
-            ci.println(name + " : " + userLinks.get(name));
+    public void _printUserLink(CommandInterpreter ci) {
+        for (String name : this.userLinksDB.keySet()) {
+            TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
+            ci.println("Name : " + name);
+            ci.println(linkConfig);
+            ci.println("Edge " + getLinkTuple(linkConfig));
+            ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
         }
     }
 
-    public void _addTopo(CommandInterpreter ci) {
+    public void _addUserLink(CommandInterpreter ci) {
         String name = ci.nextArgument();
         if ((name == null)) {
             ci.println("Please enter a valid Name");
             return;
         }
 
-        String dpid = ci.nextArgument();
-        if (dpid == null) {
-            ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx");
+        String ncStr1 = ci.nextArgument();
+        if (ncStr1 == null) {
+            ci.println("Please enter two node connector strings");
             return;
         }
-        try {
-            HexEncode.stringToLong(dpid);
-        } catch (Exception e) {
-            ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx");
+        String ncStr2 = ci.nextArgument();
+        if (ncStr2 == null) {
+            ci.println("Please enter second node connector string");
             return;
         }
 
-        String port = ci.nextArgument();
-        if (port == null) {
-            ci.println("Invalid port number");
+        NodeConnector nc1 = NodeConnector.fromString(ncStr1);
+        if (nc1 == null) {
+            ci.println("Invalid input node connector 1 string: " + ncStr1);
             return;
         }
-
-        String ddpid = ci.nextArgument();
-        if (ddpid == null) {
-            ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx");
-            return;
-        }
-        try {
-            HexEncode.stringToLong(ddpid);
-        } catch (Exception e) {
-            ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx");
+        NodeConnector nc2 = NodeConnector.fromString(ncStr2);
+        if (nc2 == null) {
+            ci.println("Invalid input node connector 2 string: " + ncStr2);
             return;
         }
 
-        String dport = ci.nextArgument();
-        if (dport == null) {
-            ci.println("Invalid port number");
-            return;
-        }
-        TopologyUserLinkConfig config = new TopologyUserLinkConfig(name,
-                dpid, port, ddpid, dport);
+        TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
         ci.println(this.addUserLink(config));
     }
 
-    public void _delTopo(CommandInterpreter ci) {
+    public void _deleteUserLink(CommandInterpreter ci) {
         String name = ci.nextArgument();
         if ((name == null)) {
             ci.println("Please enter a valid Name");
@@ -792,10 +798,30 @@ public class TopologyManagerImpl implements ITopologyManager,
         this.deleteUserLink(name);
     }
 
+    public void _printNodeEdges(CommandInterpreter ci) {
+        Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
+        if (nodeEdges == null) {
+            return;
+        }
+        Set<Node> nodeSet = nodeEdges.keySet();
+        if (nodeSet == null) {
+            return;
+        }
+        ci.println("        Node                                         Edge");
+        for (Node node : nodeSet) {
+            Set<Edge> edgeSet = nodeEdges.get(node);
+            if (edgeSet == null) {
+                continue;
+            }
+            for (Edge edge : edgeSet) {
+                ci.println(node + "             " + edge);
+            }
+        }
+    }
+
     @Override
     public Object readObject(ObjectInputStream ois)
             throws FileNotFoundException, IOException, ClassNotFoundException {
-        // TODO Auto-generated method stub
         return ois.readObject();
     }
 
@@ -806,12 +832,100 @@ public class TopologyManagerImpl implements ITopologyManager,
 
     @Override
     public void edgeOverUtilized(Edge edge) {
-        log.warn("Link Utilization above normal: " + edge);
+        log.warn("Link Utilization above normal: {}", edge);
     }
 
     @Override
     public void edgeUtilBackToNormal(Edge edge) {
-        log.warn("Link Utilization back to normal: " + edge);
+        log.warn("Link Utilization back to normal: {}", edge);
+    }
+
+    private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
+        TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
+        upd.setLocal(isLocal);
+        notifyQ.add(upd);
+    }
+
+    @Override
+    public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            // This is the case of an Edge being added to the topology DB
+            final Edge e = (Edge) key;
+            log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
+            edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
+        }
+    }
+
+    @Override
+    public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            final Edge e = (Edge) key;
+            log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+            final Set<Property> props = (Set<Property>) new_value;
+            edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
+        }
     }
 
+    @Override
+    public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            final Edge e = (Edge) key;
+            log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
+            edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
+        }
+    }
+
+    class TopologyNotify implements Runnable {
+        private final BlockingQueue<TopoEdgeUpdate> notifyQ;
+        private TopoEdgeUpdate entry;
+        private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+        private boolean notifyListeners;
+
+        TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
+            this.notifyQ = notifyQ;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    log.trace("New run of TopologyNotify");
+                    notifyListeners = false;
+                    // First we block waiting for an element to get in
+                    entry = notifyQ.take();
+                    // Then we drain the whole queue if elements are
+                    // in it without getting into any blocking condition
+                    for (; entry != null; entry = notifyQ.poll()) {
+                        teuList.add(entry);
+                        notifyListeners = true;
+                    }
+
+                    // Notify listeners only if there were updates drained else
+                    // give up
+                    if (notifyListeners) {
+                        log.trace("Notifier thread, notified a listener");
+                        // Now update the listeners
+                        for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
+                            try {
+                                s.edgeUpdate(teuList);
+                            } catch (Exception exc) {
+                                log.error("Exception on edge update:", exc);
+                            }
+                        }
+                    }
+                    teuList.clear();
+
+                    // Lets sleep for sometime to allow aggregation of event
+                    Thread.sleep(100);
+                } catch (InterruptedException e1) {
+                    log.warn("TopologyNotify interrupted {}", e1.getMessage());
+                    if (shuttingDown) {
+                        return;
+                    }
+                } catch (Exception e2) {
+                    log.error("", e2);
+                }
+            }
+        }
+    }
 }