Bug 2158: Fixed TopologyManager for edge updates. 76/12876/7
authorHideyuki Tai <Hideyuki.Tai@necam.com>
Sat, 15 Nov 2014 00:55:46 +0000 (19:55 -0500)
committerHideyuki Tai <Hideyuki.Tai@necam.com>
Tue, 23 Dec 2014 23:30:44 +0000 (18:30 -0500)
MD-SAL does not keep the order of notification of data changes. In the
result, TopologyManager sometimes receives a notification of an edge
addition before it is notified of the addition of the NodeConnector
which is the head of the edge. In such a case, TopologyManager ignores
the edge notification, and fails to create correct topology information.
This is one of the root causes of Bug 2158.
To handle this issue, this patch fixed TopologyManager to queue edge
notifications, and wait corresponding NodeConnector updates.

Change-Id: Ifeabc91d856eb9cc88ae3595d14e0a6819ec7454
Signed-off-by: Hideyuki Tai <Hideyuki.Tai@necam.com>
opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java
opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java
opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java

index 80d7083ec0dee0059a8b7c0c33720522b65fb4bc..8c422a52ea783633d386de7ea942fbbdbf4afb38 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -22,6 +21,7 @@ import org.opendaylight.controller.configuration.IConfigurationContainerService;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.ITopologyService;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.ITopologyService;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
@@ -73,6 +73,7 @@ public class Activator extends ComponentActivatorAbstractBase {
             props.put("cachenames", propSet);
 
             c.setInterface(new String[] { IListenTopoUpdates.class.getName(),
             props.put("cachenames", propSet);
 
             c.setInterface(new String[] { IListenTopoUpdates.class.getName(),
+                    IInventoryListener.class.getName(),
                     ITopologyManager.class.getName(),
                     ITopologyManagerShell.class.getName(),
                     IConfigurationContainerAware.class.getName(),
                     ITopologyManager.class.getName(),
                     ITopologyManagerShell.class.getName(),
                     IConfigurationContainerAware.class.getName(),
index 659ee7dd81ca83d91c013ceddb7017edca9a8b1b..e1a0ca1e7686c39a8e7563e6f50cb81254fe8d99 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.controller.sal.utils.IObjectReader;
 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
@@ -58,6 +59,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -76,6 +79,7 @@ public class TopologyManagerImpl implements
         IConfigurationContainerAware,
         IListenTopoUpdates,
         IObjectReader,
         IConfigurationContainerAware,
         IListenTopoUpdates,
         IObjectReader,
+        IInventoryListener,
         CommandProvider {
     protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
     protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
         CommandProvider {
     protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
     protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
@@ -83,6 +87,8 @@ public class TopologyManagerImpl implements
     protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
     private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
     protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
     private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
+    private static final long PENDING_UPDATE_TIMEOUT = 5000L;
+
     private ITopologyService topoService;
     private IClusterContainerServices clusterContainerService;
     private IConfigurationContainerService configurationService;
     private ITopologyService topoService;
     private IClusterContainerServices clusterContainerService;
     private IConfigurationContainerService configurationService;
@@ -104,7 +110,79 @@ public class TopologyManagerImpl implements
     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
     private volatile Boolean shuttingDown = false;
     private Thread notifyThread;
     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
     private volatile Boolean shuttingDown = false;
     private Thread notifyThread;
+    private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
+        new HashMap<NodeConnector, List<PendingUpdateTask>>();
+    private final BlockingQueue<TopoEdgeUpdate> updateQ =
+        new LinkedBlockingQueue<TopoEdgeUpdate>();
+    private Timer pendingTimer;
+    private Thread updateThread;
+
+    private class PendingEdgeUpdate extends TopoEdgeUpdate {
+        private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
+            super(e, p, t);
+        }
+    }
+
+    private class UpdateTopology implements Runnable {
+        @Override
+        public void run() {
+            log.trace("Start topology update thread");
+
+            while (!shuttingDown) {
+                try {
+                    List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
+                    TopoEdgeUpdate teu = updateQ.take();
+                    for (; teu != null; teu = updateQ.poll()) {
+                        list.add(teu);
+                    }
+
+                    if (!list.isEmpty()) {
+                        log.trace("Update edges: {}", list);
+                        doEdgeUpdate(list);
+                    }
+                } catch (InterruptedException e) {
+                    if (shuttingDown) {
+                        break;
+                    }
+                    log.warn("Topology update thread interrupted", e);
+                } catch (Exception e) {
+                    log.error("Exception on topology update thread", e);
+                }
+            }
+
+            log.trace("Exit topology update thread");
+        }
+    }
+
+    private class PendingUpdateTask extends TimerTask {
+        private final Edge  edge;
+        private final Set<Property>  props;
+        private final UpdateType  type;
+
+        private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
+            edge = e;
+            props = p;
+            type = t;
+        }
+
+        private NodeConnector getHeadNodeConnector() {
+            return edge.getHeadNodeConnector();
+        }
+
+        private void flush() {
+            log.info("Flush pending topology update: edge {}, type {}",
+                     edge, type);
+            updateQ.add(new PendingEdgeUpdate(edge, props, type));
+        }
 
 
+        @Override
+        public void run() {
+            if (removePendingEvent(this)) {
+                log.warn("Pending topology update timed out: edge{}, type {}",
+                         edge, type);
+            }
+        }
+    }
 
     void nonClusterObjectCreate() {
         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
 
     void nonClusterObjectCreate() {
         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
@@ -210,6 +288,8 @@ public class TopologyManagerImpl implements
         // Restore the shuttingDown status on init of the component
         shuttingDown = false;
         notifyThread = new Thread(new TopologyNotify(notifyQ));
         // Restore the shuttingDown status on init of the component
         shuttingDown = false;
         notifyThread = new Thread(new TopologyNotify(notifyQ));
+        pendingTimer = new Timer("Topology Pending Update Timer");
+        updateThread = new Thread(new UpdateTopology(), "Topology Update");
     }
 
     @SuppressWarnings({ "unchecked" })
     }
 
     @SuppressWarnings({ "unchecked" })
@@ -277,6 +357,8 @@ public class TopologyManagerImpl implements
      *
      */
     void started() {
      *
      */
     void started() {
+        updateThread.start();
+
         // Start the batcher thread for the cluster wide topology updates
         notifyThread.start();
         // SollicitRefresh MUST be called here else if called at init
         // Start the batcher thread for the cluster wide topology updates
         notifyThread.start();
         // SollicitRefresh MUST be called here else if called at init
@@ -287,7 +369,9 @@ public class TopologyManagerImpl implements
 
     void stop() {
         shuttingDown = true;
 
     void stop() {
         shuttingDown = true;
+        updateThread.interrupt();
         notifyThread.interrupt();
         notifyThread.interrupt();
+        pendingTimer.cancel();
     }
 
     /**
     }
 
     /**
@@ -297,6 +381,9 @@ public class TopologyManagerImpl implements
      *
      */
     void destroy() {
      *
      */
     void destroy() {
+        updateQ.clear();
+        updateThread = null;
+        pendingTimer = null;
         notifyQ.clear();
         notifyThread = null;
     }
         notifyQ.clear();
         notifyThread = null;
     }
@@ -571,17 +658,100 @@ public class TopologyManagerImpl implements
         return (switchManager.doesNodeConnectorExist(head));
     }
 
         return (switchManager.doesNodeConnectorExist(head));
     }
 
+    private void addPendingEvent(Edge e, Set<Property> p, UpdateType t) {
+        NodeConnector head = e.getHeadNodeConnector();
+        PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list == null) {
+                list = new LinkedList<PendingUpdateTask>();
+                pendingUpdates.put(head, list);
+            }
+            list.add(task);
+            pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+        }
+    }
+
+    private boolean enqueueEventIfPending(Edge e, Set<Property> p, UpdateType t) {
+        NodeConnector head = e.getHeadNodeConnector();
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list != null) {
+                log.warn("Enqueue edge update: edge {}, type {}", e, t);
+                PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+                list.add(task);
+                pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private boolean removePendingEvent(PendingUpdateTask t) {
+        t.cancel();
+        NodeConnector head = t.getHeadNodeConnector();
+        boolean removed = false;
+
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list != null) {
+                removed = list.remove(t);
+                if (list.isEmpty()) {
+                    pendingUpdates.remove(head);
+                }
+            }
+        }
+
+        return removed;
+    }
+
+    private void removePendingEvent(NodeConnector head, boolean doFlush) {
+        List<PendingUpdateTask> list;
+        synchronized (pendingUpdates) {
+            list = pendingUpdates.remove(head);
+        }
+
+        if (list != null) {
+            for (PendingUpdateTask task : list) {
+                if (task.cancel() && doFlush) {
+                    task.flush();
+                }
+            }
+            pendingTimer.purge();
+        }
+    }
+
     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
-        switch (type) {
-        case ADDED:
+        return edgeUpdate(e, type, props, false);
+    }
 
 
+    private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean isPending) {
+        if (!type.equals(UpdateType.ADDED) &&
+            enqueueEventIfPending(e, props, type)) {
+            return null;
+        }
 
 
+        switch (type) {
+        case ADDED:
             if (this.edgesDB.containsKey(e)) {
                 // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
                 log.trace("Skipping redundant edge addition: {}", e);
                 return null;
             }
 
             if (this.edgesDB.containsKey(e)) {
                 // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
                 log.trace("Skipping redundant edge addition: {}", e);
                 return null;
             }
 
+            // Ensure that head node connector exists
+            if (!isPending) {
+                if (headNodeConnectorExist(e)) {
+                    removePendingEvent(e.getHeadNodeConnector(), true);
+                } else {
+                    log.warn("Ignore edge that contains invalid node connector: {}",
+                             e);
+                    addPendingEvent(e, props, type);
+                    return null;
+                }
+            }
+
             // Make sure the props are non-null or create a copy
             if (props == null) {
                 props = new HashSet<Property>();
             // Make sure the props are non-null or create a copy
             if (props == null) {
                 props = new HashSet<Property>();
@@ -589,13 +759,6 @@ public class TopologyManagerImpl implements
                 props = new HashSet<Property>(props);
             }
 
                 props = new HashSet<Property>(props);
             }
 
-
-            // Ensure that head node connector exists
-            if (!headNodeConnectorExist(e)) {
-                log.warn("Ignore edge that contains invalid node connector: {}", e);
-                return null;
-            }
-
             // Check if nodeConnectors of the edge were correctly categorized
             // by protocol plugin
             crossCheckNodeConnectors(e);
             // Check if nodeConnectors of the edge were correctly categorized
             // by protocol plugin
             crossCheckNodeConnectors(e);
@@ -702,16 +865,16 @@ public class TopologyManagerImpl implements
         return new TopoEdgeUpdate(e, props, type);
     }
 
         return new TopoEdgeUpdate(e, props, type);
     }
 
-    @Override
-    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+    private void doEdgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
-        for (int i = 0; i < topoedgeupdateList.size(); i++) {
-            Edge e = topoedgeupdateList.get(i).getEdge();
-            Set<Property> p = topoedgeupdateList.get(i).getProperty();
-            UpdateType type = topoedgeupdateList.get(i).getUpdateType();
-            TopoEdgeUpdate teu = edgeUpdate(e, type, p);
-            if (teu != null) {
-                teuList.add(teu);
+        for (TopoEdgeUpdate teu : topoedgeupdateList) {
+            boolean isPending = (teu instanceof PendingEdgeUpdate);
+            Edge e = teu.getEdge();
+            Set<Property> p = teu.getProperty();
+            UpdateType type = teu.getUpdateType();
+            TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending);
+            if (update != null) {
+                teuList.add(update);
             }
         }
 
             }
         }
 
@@ -727,6 +890,11 @@ public class TopologyManagerImpl implements
         }
     }
 
         }
     }
 
+    @Override
+    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+        updateQ.addAll(topoedgeupdateList);
+    }
+
     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
@@ -934,6 +1102,19 @@ public class TopologyManagerImpl implements
         notifyQ.add(upd);
     }
 
         notifyQ.add(upd);
     }
 
+    @Override
+    public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
+        // NOP
+    }
+
+    @Override
+    public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map<String, Property> propMap) {
+        // Remove pending edge updates for the given node connector.
+        // Pending events should be notified if the node connector exists.
+        boolean doFlush = !type.equals(UpdateType.REMOVED);
+        removePendingEvent(nc, doFlush);
+    }
+
     @Override
     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
         if (cacheName.equals(TOPOEDGESDB)) {
     @Override
     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
         if (cacheName.equals(TOPOEDGESDB)) {
@@ -1094,4 +1275,35 @@ public class TopologyManagerImpl implements
         return result;
     }
 
         return result;
     }
 
+    // Only for unit test.
+    void startTest() {
+        pendingTimer = new Timer("Topology Pending Update Timer");
+        updateThread = new Thread(new UpdateTopology(), "Topology Update");
+        updateThread.start();
+    }
+
+    void stopTest() {
+        shuttingDown = true;
+        updateThread.interrupt();
+        pendingTimer.cancel();
+    }
+
+    boolean flushUpdateQueue(long timeout) {
+        long limit = System.currentTimeMillis() + timeout;
+        long cur;
+        do {
+            if (updateQ.peek() == null) {
+                return true;
+            }
+
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                break;
+            }
+            cur = System.currentTimeMillis();
+        } while (cur < limit);
+
+        return false;
+    }
 }
 }
index d1338bf6953909aff8ff1c4bea274001f9135e5c..600f1d8cbfafb05162be7cba530e6b4c05505d3a 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.controller.topologymanager.internal;
 
 import org.junit.Assert;
 package org.opendaylight.controller.topologymanager.internal;
 
 import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.sal.core.Bandwidth;
 import org.opendaylight.controller.sal.core.ConstructionException;
 import org.junit.Test;
 import org.opendaylight.controller.sal.core.Bandwidth;
 import org.opendaylight.controller.sal.core.ConstructionException;
@@ -50,6 +52,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 public class TopologyManagerImplTest {
 import java.util.concurrent.ConcurrentMap;
 
 public class TopologyManagerImplTest {
+    private TopologyManagerImpl topoManagerImpl;
+
     /**
      * Mockup of switch manager that only maintains existence of node
      * connector.
     /**
      * Mockup of switch manager that only maintains existence of node
      * connector.
@@ -78,6 +82,11 @@ public class TopologyManagerImplTest {
             }
         }
 
             }
         }
 
+        private void clear() {
+            nodeSet.clear();
+            nodeConnectorSet.clear();
+        }
+
         @Override
         public Status addSubnet(SubnetConfig configObject) {
             return null;
         @Override
         public Status addSubnet(SubnetConfig configObject) {
             return null;
@@ -325,6 +334,20 @@ public class TopologyManagerImplTest {
         }
     }
 
         }
     }
 
+    @Before
+    public void setUp() {
+        topoManagerImpl = new TopologyManagerImpl();
+        topoManagerImpl.startTest();
+    }
+
+    @After
+    public void tearDown() {
+        if (topoManagerImpl != null) {
+            topoManagerImpl.stopTest();
+            topoManagerImpl = null;
+        }
+    }
+
     /*
      * Sets the node, edges and properties for edges here: Edge <SwitchId :
      * NodeConnectorId> : <1:1>--><11:11>; <1:2>--><11:12>; <3:3>--><13:13>;
     /*
      * Sets the node, edges and properties for edges here: Edge <SwitchId :
      * NodeConnectorId> : <1:1>--><11:11>; <1:2>--><11:12>; <3:3>--><13:13>;
@@ -375,11 +398,12 @@ public class TopologyManagerImplTest {
             topoedgeupdateList.add(teu2);
             topoManagerImpl.edgeUpdate(topoedgeupdateList);
         }
             topoedgeupdateList.add(teu2);
             topoManagerImpl.edgeUpdate(topoedgeupdateList);
         }
+
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
     }
 
     @Test
     public void testGetNodeEdges() throws ConstructionException {
     }
 
     @Test
     public void testGetNodeEdges() throws ConstructionException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         setNodeEdges(topoManagerImpl, swMgr);
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         setNodeEdges(topoManagerImpl, swMgr);
@@ -412,7 +436,6 @@ public class TopologyManagerImplTest {
 
     @Test
     public void testGetEdges() throws ConstructionException {
 
     @Test
     public void testGetEdges() throws ConstructionException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         setNodeEdges(topoManagerImpl, swMgr);
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         setNodeEdges(topoManagerImpl, swMgr);
@@ -496,7 +519,6 @@ public class TopologyManagerImplTest {
         TopologyUserLinkConfig link4 = new TopologyUserLinkConfig("default20",
                 "OF|10@OF|20", "OF|10@OF|30");
 
         TopologyUserLinkConfig link4 = new TopologyUserLinkConfig("default20",
                 "OF|10@OF|20", "OF|10@OF|30");
 
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -529,7 +551,6 @@ public class TopologyManagerImplTest {
     public void testGetUserLink() {
         TopologyUserLinkConfig[] link = new TopologyUserLinkConfig[5];
         TopologyUserLinkConfig[] reverseLink = new TopologyUserLinkConfig[5];
     public void testGetUserLink() {
         TopologyUserLinkConfig[] link = new TopologyUserLinkConfig[5];
         TopologyUserLinkConfig[] reverseLink = new TopologyUserLinkConfig[5];
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -614,7 +635,6 @@ public class TopologyManagerImplTest {
     @Test
     public void testHostLinkMethods() throws ConstructionException,
     UnknownHostException {
     @Test
     public void testHostLinkMethods() throws ConstructionException,
     UnknownHostException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -678,7 +698,6 @@ public class TopologyManagerImplTest {
     @Test
     public void testGetNodesWithNodeConnectorHost()
             throws ConstructionException, UnknownHostException {
     @Test
     public void testGetNodesWithNodeConnectorHost()
             throws ConstructionException, UnknownHostException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -738,7 +757,6 @@ public class TopologyManagerImplTest {
 
     @Test
     public void bug1348FixTest() throws ConstructionException {
 
     @Test
     public void bug1348FixTest() throws ConstructionException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -763,7 +781,91 @@ public class TopologyManagerImplTest {
             Assert.fail("Exception was raised when trying to update edge properties: " + e.getMessage());
         }
 
             Assert.fail("Exception was raised when trying to update edge properties: " + e.getMessage());
         }
 
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
         Assert.assertEquals(1, topoManagerImpl.getEdges().size());
         Assert.assertNotNull(topoManagerImpl.getEdges().get(edge));
     }
         Assert.assertEquals(1, topoManagerImpl.getEdges().size());
         Assert.assertNotNull(topoManagerImpl.getEdges().get(edge));
     }
+
+    @Test
+    public void testNotifyNodeConnector() throws ConstructionException {
+        TestSwitchManager swMgr = new TestSwitchManager();
+        topoManagerImpl.setSwitchManager(swMgr);
+        topoManagerImpl.nonClusterObjectCreate();
+
+        // Test NodeConnector notification in the case that there are no
+        // related edge updates.
+        NodeConnector nc1 = NodeConnectorCreator.createOFNodeConnector(
+                (short) 1, NodeCreator.createOFNode(1000L));
+        Map<String, Property> propMap = new HashMap<>();
+        swMgr.addNodeConnectors(nc1);
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.CHANGED, propMap);
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+        swMgr.clear();
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+        // Test NodeConnector notification in the case that there is a related
+        // edge update just before the notification.
+        NodeConnector nc2 = NodeConnectorCreator.createOFNodeConnector(
+                (short) 2, NodeCreator.createOFNode(2000L));
+        Edge edge1 = new Edge(nc1, nc2);
+        Edge edge2 = new Edge(nc2, nc1);
+        Set<Property> props = new HashSet<Property>();
+        TopoEdgeUpdate teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+        TopoEdgeUpdate teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+        List<TopoEdgeUpdate> topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+        topoedgeupdateList.add(teu1);
+        topoedgeupdateList.add(teu2);
+        topoManagerImpl.edgeUpdate(topoedgeupdateList);
+        swMgr.addNodeConnectors(nc1);
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+        swMgr.addNodeConnectors(nc2);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(2, topoManagerImpl.getEdges().size());
+
+        teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+        teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+        topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+        topoedgeupdateList.add(teu1);
+        topoedgeupdateList.add(teu2);
+        topoManagerImpl.edgeUpdate(topoedgeupdateList);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+
+        swMgr.clear();
+
+        // Test NodeConnector notification in the case that there are multiple
+        // edge updates related to the NodeConnector just before the notification.
+        teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+        teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+        TopoEdgeUpdate teu3 = new TopoEdgeUpdate(edge1, props, UpdateType.CHANGED);
+        TopoEdgeUpdate teu4 = new TopoEdgeUpdate(edge2, props, UpdateType.CHANGED);
+        TopoEdgeUpdate teu5 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+        TopoEdgeUpdate teu6 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+        topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+        topoedgeupdateList.add(teu1);
+        topoedgeupdateList.add(teu2);
+        topoedgeupdateList.add(teu3);
+        topoedgeupdateList.add(teu4);
+        topoedgeupdateList.add(teu5);
+        topoedgeupdateList.add(teu6);
+        topoManagerImpl.edgeUpdate(topoedgeupdateList);
+        swMgr.addNodeConnectors(nc1);
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+        swMgr.addNodeConnectors(nc2);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+    }
 }
 }