Bug 2158: Fixed TopologyManager for edge updates. 76/12876/7
authorHideyuki Tai <Hideyuki.Tai@necam.com>
Sat, 15 Nov 2014 00:55:46 +0000 (19:55 -0500)
committerHideyuki Tai <Hideyuki.Tai@necam.com>
Tue, 23 Dec 2014 23:30:44 +0000 (18:30 -0500)
MD-SAL does not keep the order of notification of data changes. In the
result, TopologyManager sometimes receives a notification of an edge
addition before it is notified of the addition of the NodeConnector
which is the head of the edge. In such a case, TopologyManager ignores
the edge notification, and fails to create correct topology information.
This is one of the root causes of Bug 2158.
To handle this issue, this patch fixed TopologyManager to queue edge
notifications, and wait corresponding NodeConnector updates.

Change-Id: Ifeabc91d856eb9cc88ae3595d14e0a6819ec7454
Signed-off-by: Hideyuki Tai <Hideyuki.Tai@necam.com>
opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java
opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java
opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java

index 80d7083..8c422a5 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -22,6 +21,7 @@ import org.opendaylight.controller.configuration.IConfigurationContainerService;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.ITopologyService;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
@@ -73,6 +73,7 @@ public class Activator extends ComponentActivatorAbstractBase {
             props.put("cachenames", propSet);
 
             c.setInterface(new String[] { IListenTopoUpdates.class.getName(),
+                    IInventoryListener.class.getName(),
                     ITopologyManager.class.getName(),
                     ITopologyManagerShell.class.getName(),
                     IConfigurationContainerAware.class.getName(),
index 659ee7d..e1a0ca1 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.controller.sal.utils.IObjectReader;
 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
@@ -58,6 +59,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -76,6 +79,7 @@ public class TopologyManagerImpl implements
         IConfigurationContainerAware,
         IListenTopoUpdates,
         IObjectReader,
+        IInventoryListener,
         CommandProvider {
     protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
     protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
@@ -83,6 +87,8 @@ public class TopologyManagerImpl implements
     protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
     private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
+    private static final long PENDING_UPDATE_TIMEOUT = 5000L;
+
     private ITopologyService topoService;
     private IClusterContainerServices clusterContainerService;
     private IConfigurationContainerService configurationService;
@@ -104,7 +110,79 @@ public class TopologyManagerImpl implements
     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
     private volatile Boolean shuttingDown = false;
     private Thread notifyThread;
+    private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
+        new HashMap<NodeConnector, List<PendingUpdateTask>>();
+    private final BlockingQueue<TopoEdgeUpdate> updateQ =
+        new LinkedBlockingQueue<TopoEdgeUpdate>();
+    private Timer pendingTimer;
+    private Thread updateThread;
+
+    private class PendingEdgeUpdate extends TopoEdgeUpdate {
+        private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
+            super(e, p, t);
+        }
+    }
+
+    private class UpdateTopology implements Runnable {
+        @Override
+        public void run() {
+            log.trace("Start topology update thread");
+
+            while (!shuttingDown) {
+                try {
+                    List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
+                    TopoEdgeUpdate teu = updateQ.take();
+                    for (; teu != null; teu = updateQ.poll()) {
+                        list.add(teu);
+                    }
+
+                    if (!list.isEmpty()) {
+                        log.trace("Update edges: {}", list);
+                        doEdgeUpdate(list);
+                    }
+                } catch (InterruptedException e) {
+                    if (shuttingDown) {
+                        break;
+                    }
+                    log.warn("Topology update thread interrupted", e);
+                } catch (Exception e) {
+                    log.error("Exception on topology update thread", e);
+                }
+            }
+
+            log.trace("Exit topology update thread");
+        }
+    }
+
+    private class PendingUpdateTask extends TimerTask {
+        private final Edge  edge;
+        private final Set<Property>  props;
+        private final UpdateType  type;
+
+        private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
+            edge = e;
+            props = p;
+            type = t;
+        }
+
+        private NodeConnector getHeadNodeConnector() {
+            return edge.getHeadNodeConnector();
+        }
+
+        private void flush() {
+            log.info("Flush pending topology update: edge {}, type {}",
+                     edge, type);
+            updateQ.add(new PendingEdgeUpdate(edge, props, type));
+        }
 
+        @Override
+        public void run() {
+            if (removePendingEvent(this)) {
+                log.warn("Pending topology update timed out: edge{}, type {}",
+                         edge, type);
+            }
+        }
+    }
 
     void nonClusterObjectCreate() {
         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
@@ -210,6 +288,8 @@ public class TopologyManagerImpl implements
         // Restore the shuttingDown status on init of the component
         shuttingDown = false;
         notifyThread = new Thread(new TopologyNotify(notifyQ));
+        pendingTimer = new Timer("Topology Pending Update Timer");
+        updateThread = new Thread(new UpdateTopology(), "Topology Update");
     }
 
     @SuppressWarnings({ "unchecked" })
@@ -277,6 +357,8 @@ public class TopologyManagerImpl implements
      *
      */
     void started() {
+        updateThread.start();
+
         // Start the batcher thread for the cluster wide topology updates
         notifyThread.start();
         // SollicitRefresh MUST be called here else if called at init
@@ -287,7 +369,9 @@ public class TopologyManagerImpl implements
 
     void stop() {
         shuttingDown = true;
+        updateThread.interrupt();
         notifyThread.interrupt();
+        pendingTimer.cancel();
     }
 
     /**
@@ -297,6 +381,9 @@ public class TopologyManagerImpl implements
      *
      */
     void destroy() {
+        updateQ.clear();
+        updateThread = null;
+        pendingTimer = null;
         notifyQ.clear();
         notifyThread = null;
     }
@@ -571,17 +658,100 @@ public class TopologyManagerImpl implements
         return (switchManager.doesNodeConnectorExist(head));
     }
 
+    private void addPendingEvent(Edge e, Set<Property> p, UpdateType t) {
+        NodeConnector head = e.getHeadNodeConnector();
+        PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list == null) {
+                list = new LinkedList<PendingUpdateTask>();
+                pendingUpdates.put(head, list);
+            }
+            list.add(task);
+            pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+        }
+    }
+
+    private boolean enqueueEventIfPending(Edge e, Set<Property> p, UpdateType t) {
+        NodeConnector head = e.getHeadNodeConnector();
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list != null) {
+                log.warn("Enqueue edge update: edge {}, type {}", e, t);
+                PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+                list.add(task);
+                pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private boolean removePendingEvent(PendingUpdateTask t) {
+        t.cancel();
+        NodeConnector head = t.getHeadNodeConnector();
+        boolean removed = false;
+
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list != null) {
+                removed = list.remove(t);
+                if (list.isEmpty()) {
+                    pendingUpdates.remove(head);
+                }
+            }
+        }
+
+        return removed;
+    }
+
+    private void removePendingEvent(NodeConnector head, boolean doFlush) {
+        List<PendingUpdateTask> list;
+        synchronized (pendingUpdates) {
+            list = pendingUpdates.remove(head);
+        }
+
+        if (list != null) {
+            for (PendingUpdateTask task : list) {
+                if (task.cancel() && doFlush) {
+                    task.flush();
+                }
+            }
+            pendingTimer.purge();
+        }
+    }
+
     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
-        switch (type) {
-        case ADDED:
+        return edgeUpdate(e, type, props, false);
+    }
 
+    private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean isPending) {
+        if (!type.equals(UpdateType.ADDED) &&
+            enqueueEventIfPending(e, props, type)) {
+            return null;
+        }
 
+        switch (type) {
+        case ADDED:
             if (this.edgesDB.containsKey(e)) {
                 // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
                 log.trace("Skipping redundant edge addition: {}", e);
                 return null;
             }
 
+            // Ensure that head node connector exists
+            if (!isPending) {
+                if (headNodeConnectorExist(e)) {
+                    removePendingEvent(e.getHeadNodeConnector(), true);
+                } else {
+                    log.warn("Ignore edge that contains invalid node connector: {}",
+                             e);
+                    addPendingEvent(e, props, type);
+                    return null;
+                }
+            }
+
             // Make sure the props are non-null or create a copy
             if (props == null) {
                 props = new HashSet<Property>();
@@ -589,13 +759,6 @@ public class TopologyManagerImpl implements
                 props = new HashSet<Property>(props);
             }
 
-
-            // Ensure that head node connector exists
-            if (!headNodeConnectorExist(e)) {
-                log.warn("Ignore edge that contains invalid node connector: {}", e);
-                return null;
-            }
-
             // Check if nodeConnectors of the edge were correctly categorized
             // by protocol plugin
             crossCheckNodeConnectors(e);
@@ -702,16 +865,16 @@ public class TopologyManagerImpl implements
         return new TopoEdgeUpdate(e, props, type);
     }
 
-    @Override
-    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+    private void doEdgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
-        for (int i = 0; i < topoedgeupdateList.size(); i++) {
-            Edge e = topoedgeupdateList.get(i).getEdge();
-            Set<Property> p = topoedgeupdateList.get(i).getProperty();
-            UpdateType type = topoedgeupdateList.get(i).getUpdateType();
-            TopoEdgeUpdate teu = edgeUpdate(e, type, p);
-            if (teu != null) {
-                teuList.add(teu);
+        for (TopoEdgeUpdate teu : topoedgeupdateList) {
+            boolean isPending = (teu instanceof PendingEdgeUpdate);
+            Edge e = teu.getEdge();
+            Set<Property> p = teu.getProperty();
+            UpdateType type = teu.getUpdateType();
+            TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending);
+            if (update != null) {
+                teuList.add(update);
             }
         }
 
@@ -727,6 +890,11 @@ public class TopologyManagerImpl implements
         }
     }
 
+    @Override
+    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+        updateQ.addAll(topoedgeupdateList);
+    }
+
     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
@@ -934,6 +1102,19 @@ public class TopologyManagerImpl implements
         notifyQ.add(upd);
     }
 
+    @Override
+    public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
+        // NOP
+    }
+
+    @Override
+    public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map<String, Property> propMap) {
+        // Remove pending edge updates for the given node connector.
+        // Pending events should be notified if the node connector exists.
+        boolean doFlush = !type.equals(UpdateType.REMOVED);
+        removePendingEvent(nc, doFlush);
+    }
+
     @Override
     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
         if (cacheName.equals(TOPOEDGESDB)) {
@@ -1094,4 +1275,35 @@ public class TopologyManagerImpl implements
         return result;
     }
 
+    // Only for unit test.
+    void startTest() {
+        pendingTimer = new Timer("Topology Pending Update Timer");
+        updateThread = new Thread(new UpdateTopology(), "Topology Update");
+        updateThread.start();
+    }
+
+    void stopTest() {
+        shuttingDown = true;
+        updateThread.interrupt();
+        pendingTimer.cancel();
+    }
+
+    boolean flushUpdateQueue(long timeout) {
+        long limit = System.currentTimeMillis() + timeout;
+        long cur;
+        do {
+            if (updateQ.peek() == null) {
+                return true;
+            }
+
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                break;
+            }
+            cur = System.currentTimeMillis();
+        } while (cur < limit);
+
+        return false;
+    }
 }
index d1338bf..600f1d8 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.controller.topologymanager.internal;
 
 import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.sal.core.Bandwidth;
 import org.opendaylight.controller.sal.core.ConstructionException;
@@ -50,6 +52,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 public class TopologyManagerImplTest {
+    private TopologyManagerImpl topoManagerImpl;
+
     /**
      * Mockup of switch manager that only maintains existence of node
      * connector.
@@ -78,6 +82,11 @@ public class TopologyManagerImplTest {
             }
         }
 
+        private void clear() {
+            nodeSet.clear();
+            nodeConnectorSet.clear();
+        }
+
         @Override
         public Status addSubnet(SubnetConfig configObject) {
             return null;
@@ -325,6 +334,20 @@ public class TopologyManagerImplTest {
         }
     }
 
+    @Before
+    public void setUp() {
+        topoManagerImpl = new TopologyManagerImpl();
+        topoManagerImpl.startTest();
+    }
+
+    @After
+    public void tearDown() {
+        if (topoManagerImpl != null) {
+            topoManagerImpl.stopTest();
+            topoManagerImpl = null;
+        }
+    }
+
     /*
      * Sets the node, edges and properties for edges here: Edge <SwitchId :
      * NodeConnectorId> : <1:1>--><11:11>; <1:2>--><11:12>; <3:3>--><13:13>;
@@ -375,11 +398,12 @@ public class TopologyManagerImplTest {
             topoedgeupdateList.add(teu2);
             topoManagerImpl.edgeUpdate(topoedgeupdateList);
         }
+
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
     }
 
     @Test
     public void testGetNodeEdges() throws ConstructionException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         setNodeEdges(topoManagerImpl, swMgr);
@@ -412,7 +436,6 @@ public class TopologyManagerImplTest {
 
     @Test
     public void testGetEdges() throws ConstructionException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         setNodeEdges(topoManagerImpl, swMgr);
@@ -496,7 +519,6 @@ public class TopologyManagerImplTest {
         TopologyUserLinkConfig link4 = new TopologyUserLinkConfig("default20",
                 "OF|10@OF|20", "OF|10@OF|30");
 
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -529,7 +551,6 @@ public class TopologyManagerImplTest {
     public void testGetUserLink() {
         TopologyUserLinkConfig[] link = new TopologyUserLinkConfig[5];
         TopologyUserLinkConfig[] reverseLink = new TopologyUserLinkConfig[5];
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -614,7 +635,6 @@ public class TopologyManagerImplTest {
     @Test
     public void testHostLinkMethods() throws ConstructionException,
     UnknownHostException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -678,7 +698,6 @@ public class TopologyManagerImplTest {
     @Test
     public void testGetNodesWithNodeConnectorHost()
             throws ConstructionException, UnknownHostException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -738,7 +757,6 @@ public class TopologyManagerImplTest {
 
     @Test
     public void bug1348FixTest() throws ConstructionException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -763,7 +781,91 @@ public class TopologyManagerImplTest {
             Assert.fail("Exception was raised when trying to update edge properties: " + e.getMessage());
         }
 
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
         Assert.assertEquals(1, topoManagerImpl.getEdges().size());
         Assert.assertNotNull(topoManagerImpl.getEdges().get(edge));
     }
+
+    @Test
+    public void testNotifyNodeConnector() throws ConstructionException {
+        TestSwitchManager swMgr = new TestSwitchManager();
+        topoManagerImpl.setSwitchManager(swMgr);
+        topoManagerImpl.nonClusterObjectCreate();
+
+        // Test NodeConnector notification in the case that there are no
+        // related edge updates.
+        NodeConnector nc1 = NodeConnectorCreator.createOFNodeConnector(
+                (short) 1, NodeCreator.createOFNode(1000L));
+        Map<String, Property> propMap = new HashMap<>();
+        swMgr.addNodeConnectors(nc1);
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.CHANGED, propMap);
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+        swMgr.clear();
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+        // Test NodeConnector notification in the case that there is a related
+        // edge update just before the notification.
+        NodeConnector nc2 = NodeConnectorCreator.createOFNodeConnector(
+                (short) 2, NodeCreator.createOFNode(2000L));
+        Edge edge1 = new Edge(nc1, nc2);
+        Edge edge2 = new Edge(nc2, nc1);
+        Set<Property> props = new HashSet<Property>();
+        TopoEdgeUpdate teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+        TopoEdgeUpdate teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+        List<TopoEdgeUpdate> topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+        topoedgeupdateList.add(teu1);
+        topoedgeupdateList.add(teu2);
+        topoManagerImpl.edgeUpdate(topoedgeupdateList);
+        swMgr.addNodeConnectors(nc1);
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+        swMgr.addNodeConnectors(nc2);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(2, topoManagerImpl.getEdges().size());
+
+        teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+        teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+        topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+        topoedgeupdateList.add(teu1);
+        topoedgeupdateList.add(teu2);
+        topoManagerImpl.edgeUpdate(topoedgeupdateList);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+
+        swMgr.clear();
+
+        // Test NodeConnector notification in the case that there are multiple
+        // edge updates related to the NodeConnector just before the notification.
+        teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+        teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+        TopoEdgeUpdate teu3 = new TopoEdgeUpdate(edge1, props, UpdateType.CHANGED);
+        TopoEdgeUpdate teu4 = new TopoEdgeUpdate(edge2, props, UpdateType.CHANGED);
+        TopoEdgeUpdate teu5 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+        TopoEdgeUpdate teu6 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+        topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+        topoedgeupdateList.add(teu1);
+        topoedgeupdateList.add(teu2);
+        topoedgeupdateList.add(teu3);
+        topoedgeupdateList.add(teu4);
+        topoedgeupdateList.add(teu5);
+        topoedgeupdateList.add(teu6);
+        topoManagerImpl.edgeUpdate(topoedgeupdateList);
+        swMgr.addNodeConnectors(nc1);
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+        swMgr.addNodeConnectors(nc2);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+    }
 }