From: Hideyuki Tai Date: Sat, 15 Nov 2014 00:55:46 +0000 (-0500) Subject: Bug 2158: Fixed TopologyManager for edge updates. X-Git-Tag: release/lithium~734^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1cdbaaa2b55a8be4948d782ae32d7cf278cf14cf Bug 2158: Fixed TopologyManager for edge updates. MD-SAL does not keep the order of notification of data changes. In the result, TopologyManager sometimes receives a notification of an edge addition before it is notified of the addition of the NodeConnector which is the head of the edge. In such a case, TopologyManager ignores the edge notification, and fails to create correct topology information. This is one of the root causes of Bug 2158. To handle this issue, this patch fixed TopologyManager to queue edge notifications, and wait corresponding NodeConnector updates. Change-Id: Ifeabc91d856eb9cc88ae3595d14e0a6819ec7454 Signed-off-by: Hideyuki Tai --- diff --git a/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java b/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java index 80d7083ec0..8c422a52ea 100644 --- a/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java +++ b/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -22,6 +21,7 @@ import org.opendaylight.controller.configuration.IConfigurationContainerService; import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase; import org.opendaylight.controller.sal.topology.IListenTopoUpdates; import org.opendaylight.controller.sal.topology.ITopologyService; +import org.opendaylight.controller.switchmanager.IInventoryListener; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; @@ -73,6 +73,7 @@ public class Activator extends ComponentActivatorAbstractBase { props.put("cachenames", propSet); c.setInterface(new String[] { IListenTopoUpdates.class.getName(), + IInventoryListener.class.getName(), ITopologyManager.class.getName(), ITopologyManagerShell.class.getName(), IConfigurationContainerAware.class.getName(), diff --git a/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java b/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java index 659ee7dd81..e1a0ca1e76 100644 --- a/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java +++ b/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java @@ -34,6 +34,7 @@ import org.opendaylight.controller.sal.utils.IObjectReader; import org.opendaylight.controller.sal.utils.NodeConnectorCreator; import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; +import org.opendaylight.controller.switchmanager.IInventoryListener; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; @@ -58,6 +59,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -76,6 +79,7 @@ public class TopologyManagerImpl implements IConfigurationContainerAware, IListenTopoUpdates, IObjectReader, + IInventoryListener, CommandProvider { protected static final String TOPOEDGESDB = "topologymanager.edgesDB"; protected static final String TOPOHOSTSDB = "topologymanager.hostsDB"; @@ -83,6 +87,8 @@ public class TopologyManagerImpl implements protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB"; private static final String USER_LINKS_FILE_NAME = "userTopology.conf"; private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class); + private static final long PENDING_UPDATE_TIMEOUT = 5000L; + private ITopologyService topoService; private IClusterContainerServices clusterContainerService; private IConfigurationContainerService configurationService; @@ -104,7 +110,79 @@ public class TopologyManagerImpl implements private BlockingQueue notifyQ = new LinkedBlockingQueue(); private volatile Boolean shuttingDown = false; private Thread notifyThread; + private final Map> pendingUpdates = + new HashMap>(); + private final BlockingQueue updateQ = + new LinkedBlockingQueue(); + private Timer pendingTimer; + private Thread updateThread; + + private class PendingEdgeUpdate extends TopoEdgeUpdate { + private PendingEdgeUpdate(Edge e, Set p, UpdateType t) { + super(e, p, t); + } + } + + private class UpdateTopology implements Runnable { + @Override + public void run() { + log.trace("Start topology update thread"); + + while (!shuttingDown) { + try { + List list = new ArrayList(); + TopoEdgeUpdate teu = updateQ.take(); + for (; teu != null; teu = updateQ.poll()) { + list.add(teu); + } + + if (!list.isEmpty()) { + log.trace("Update edges: {}", list); + doEdgeUpdate(list); + } + } catch (InterruptedException e) { + if (shuttingDown) { + break; + } + log.warn("Topology update thread interrupted", e); + } catch (Exception e) { + log.error("Exception on topology update thread", e); + } + } + + log.trace("Exit topology update thread"); + } + } + + private class PendingUpdateTask extends TimerTask { + private final Edge edge; + private final Set props; + private final UpdateType type; + + private PendingUpdateTask(Edge e, Set p, UpdateType t) { + edge = e; + props = p; + type = t; + } + + private NodeConnector getHeadNodeConnector() { + return edge.getHeadNodeConnector(); + } + + private void flush() { + log.info("Flush pending topology update: edge {}, type {}", + edge, type); + updateQ.add(new PendingEdgeUpdate(edge, props, type)); + } + @Override + public void run() { + if (removePendingEvent(this)) { + log.warn("Pending topology update timed out: edge{}, type {}", + edge, type); + } + } + } void nonClusterObjectCreate() { edgesDB = new ConcurrentHashMap>(); @@ -210,6 +288,8 @@ public class TopologyManagerImpl implements // Restore the shuttingDown status on init of the component shuttingDown = false; notifyThread = new Thread(new TopologyNotify(notifyQ)); + pendingTimer = new Timer("Topology Pending Update Timer"); + updateThread = new Thread(new UpdateTopology(), "Topology Update"); } @SuppressWarnings({ "unchecked" }) @@ -277,6 +357,8 @@ public class TopologyManagerImpl implements * */ void started() { + updateThread.start(); + // Start the batcher thread for the cluster wide topology updates notifyThread.start(); // SollicitRefresh MUST be called here else if called at init @@ -287,7 +369,9 @@ public class TopologyManagerImpl implements void stop() { shuttingDown = true; + updateThread.interrupt(); notifyThread.interrupt(); + pendingTimer.cancel(); } /** @@ -297,6 +381,9 @@ public class TopologyManagerImpl implements * */ void destroy() { + updateQ.clear(); + updateThread = null; + pendingTimer = null; notifyQ.clear(); notifyThread = null; } @@ -571,17 +658,100 @@ public class TopologyManagerImpl implements return (switchManager.doesNodeConnectorExist(head)); } + private void addPendingEvent(Edge e, Set p, UpdateType t) { + NodeConnector head = e.getHeadNodeConnector(); + PendingUpdateTask task = new PendingUpdateTask(e, p, t); + synchronized (pendingUpdates) { + List list = pendingUpdates.get(head); + if (list == null) { + list = new LinkedList(); + pendingUpdates.put(head, list); + } + list.add(task); + pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT); + } + } + + private boolean enqueueEventIfPending(Edge e, Set p, UpdateType t) { + NodeConnector head = e.getHeadNodeConnector(); + synchronized (pendingUpdates) { + List list = pendingUpdates.get(head); + if (list != null) { + log.warn("Enqueue edge update: edge {}, type {}", e, t); + PendingUpdateTask task = new PendingUpdateTask(e, p, t); + list.add(task); + pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT); + return true; + } + } + + return false; + } + + private boolean removePendingEvent(PendingUpdateTask t) { + t.cancel(); + NodeConnector head = t.getHeadNodeConnector(); + boolean removed = false; + + synchronized (pendingUpdates) { + List list = pendingUpdates.get(head); + if (list != null) { + removed = list.remove(t); + if (list.isEmpty()) { + pendingUpdates.remove(head); + } + } + } + + return removed; + } + + private void removePendingEvent(NodeConnector head, boolean doFlush) { + List list; + synchronized (pendingUpdates) { + list = pendingUpdates.remove(head); + } + + if (list != null) { + for (PendingUpdateTask task : list) { + if (task.cancel() && doFlush) { + task.flush(); + } + } + pendingTimer.purge(); + } + } + private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props) { - switch (type) { - case ADDED: + return edgeUpdate(e, type, props, false); + } + private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props, boolean isPending) { + if (!type.equals(UpdateType.ADDED) && + enqueueEventIfPending(e, props, type)) { + return null; + } + switch (type) { + case ADDED: if (this.edgesDB.containsKey(e)) { // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks log.trace("Skipping redundant edge addition: {}", e); return null; } + // Ensure that head node connector exists + if (!isPending) { + if (headNodeConnectorExist(e)) { + removePendingEvent(e.getHeadNodeConnector(), true); + } else { + log.warn("Ignore edge that contains invalid node connector: {}", + e); + addPendingEvent(e, props, type); + return null; + } + } + // Make sure the props are non-null or create a copy if (props == null) { props = new HashSet(); @@ -589,13 +759,6 @@ public class TopologyManagerImpl implements props = new HashSet(props); } - - // Ensure that head node connector exists - if (!headNodeConnectorExist(e)) { - log.warn("Ignore edge that contains invalid node connector: {}", e); - return null; - } - // Check if nodeConnectors of the edge were correctly categorized // by protocol plugin crossCheckNodeConnectors(e); @@ -702,16 +865,16 @@ public class TopologyManagerImpl implements return new TopoEdgeUpdate(e, props, type); } - @Override - public void edgeUpdate(List topoedgeupdateList) { + private void doEdgeUpdate(List topoedgeupdateList) { List teuList = new ArrayList(); - for (int i = 0; i < topoedgeupdateList.size(); i++) { - Edge e = topoedgeupdateList.get(i).getEdge(); - Set p = topoedgeupdateList.get(i).getProperty(); - UpdateType type = topoedgeupdateList.get(i).getUpdateType(); - TopoEdgeUpdate teu = edgeUpdate(e, type, p); - if (teu != null) { - teuList.add(teu); + for (TopoEdgeUpdate teu : topoedgeupdateList) { + boolean isPending = (teu instanceof PendingEdgeUpdate); + Edge e = teu.getEdge(); + Set p = teu.getProperty(); + UpdateType type = teu.getUpdateType(); + TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending); + if (update != null) { + teuList.add(update); } } @@ -727,6 +890,11 @@ public class TopologyManagerImpl implements } } + @Override + public void edgeUpdate(List topoedgeupdateList) { + updateQ.addAll(topoedgeupdateList); + } + private Edge getReverseLinkTuple(TopologyUserLinkConfig link) { TopologyUserLinkConfig rLink = new TopologyUserLinkConfig( link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector()); @@ -934,6 +1102,19 @@ public class TopologyManagerImpl implements notifyQ.add(upd); } + @Override + public void notifyNode(Node node, UpdateType type, Map propMap) { + // NOP + } + + @Override + public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map propMap) { + // Remove pending edge updates for the given node connector. + // Pending events should be notified if the node connector exists. + boolean doFlush = !type.equals(UpdateType.REMOVED); + removePendingEvent(nc, doFlush); + } + @Override public void entryCreated(final Object key, final String cacheName, final boolean originLocal) { if (cacheName.equals(TOPOEDGESDB)) { @@ -1094,4 +1275,35 @@ public class TopologyManagerImpl implements return result; } + // Only for unit test. + void startTest() { + pendingTimer = new Timer("Topology Pending Update Timer"); + updateThread = new Thread(new UpdateTopology(), "Topology Update"); + updateThread.start(); + } + + void stopTest() { + shuttingDown = true; + updateThread.interrupt(); + pendingTimer.cancel(); + } + + boolean flushUpdateQueue(long timeout) { + long limit = System.currentTimeMillis() + timeout; + long cur; + do { + if (updateQ.peek() == null) { + return true; + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + cur = System.currentTimeMillis(); + } while (cur < limit); + + return false; + } } diff --git a/opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java b/opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java index d1338bf695..600f1d8cbf 100644 --- a/opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java +++ b/opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.topologymanager.internal; import org.junit.Assert; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.sal.core.Bandwidth; import org.opendaylight.controller.sal.core.ConstructionException; @@ -50,6 +52,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; public class TopologyManagerImplTest { + private TopologyManagerImpl topoManagerImpl; + /** * Mockup of switch manager that only maintains existence of node * connector. @@ -78,6 +82,11 @@ public class TopologyManagerImplTest { } } + private void clear() { + nodeSet.clear(); + nodeConnectorSet.clear(); + } + @Override public Status addSubnet(SubnetConfig configObject) { return null; @@ -325,6 +334,20 @@ public class TopologyManagerImplTest { } } + @Before + public void setUp() { + topoManagerImpl = new TopologyManagerImpl(); + topoManagerImpl.startTest(); + } + + @After + public void tearDown() { + if (topoManagerImpl != null) { + topoManagerImpl.stopTest(); + topoManagerImpl = null; + } + } + /* * Sets the node, edges and properties for edges here: Edge : <1:1>--><11:11>; <1:2>--><11:12>; <3:3>--><13:13>; @@ -375,11 +398,12 @@ public class TopologyManagerImplTest { topoedgeupdateList.add(teu2); topoManagerImpl.edgeUpdate(topoedgeupdateList); } + + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); } @Test public void testGetNodeEdges() throws ConstructionException { - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); setNodeEdges(topoManagerImpl, swMgr); @@ -412,7 +436,6 @@ public class TopologyManagerImplTest { @Test public void testGetEdges() throws ConstructionException { - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); setNodeEdges(topoManagerImpl, swMgr); @@ -496,7 +519,6 @@ public class TopologyManagerImplTest { TopologyUserLinkConfig link4 = new TopologyUserLinkConfig("default20", "OF|10@OF|20", "OF|10@OF|30"); - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); topoManagerImpl.nonClusterObjectCreate(); @@ -529,7 +551,6 @@ public class TopologyManagerImplTest { public void testGetUserLink() { TopologyUserLinkConfig[] link = new TopologyUserLinkConfig[5]; TopologyUserLinkConfig[] reverseLink = new TopologyUserLinkConfig[5]; - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); topoManagerImpl.nonClusterObjectCreate(); @@ -614,7 +635,6 @@ public class TopologyManagerImplTest { @Test public void testHostLinkMethods() throws ConstructionException, UnknownHostException { - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); topoManagerImpl.nonClusterObjectCreate(); @@ -678,7 +698,6 @@ public class TopologyManagerImplTest { @Test public void testGetNodesWithNodeConnectorHost() throws ConstructionException, UnknownHostException { - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); topoManagerImpl.nonClusterObjectCreate(); @@ -738,7 +757,6 @@ public class TopologyManagerImplTest { @Test public void bug1348FixTest() throws ConstructionException { - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); topoManagerImpl.nonClusterObjectCreate(); @@ -763,7 +781,91 @@ public class TopologyManagerImplTest { Assert.fail("Exception was raised when trying to update edge properties: " + e.getMessage()); } + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); Assert.assertEquals(1, topoManagerImpl.getEdges().size()); Assert.assertNotNull(topoManagerImpl.getEdges().get(edge)); } + + @Test + public void testNotifyNodeConnector() throws ConstructionException { + TestSwitchManager swMgr = new TestSwitchManager(); + topoManagerImpl.setSwitchManager(swMgr); + topoManagerImpl.nonClusterObjectCreate(); + + // Test NodeConnector notification in the case that there are no + // related edge updates. + NodeConnector nc1 = NodeConnectorCreator.createOFNodeConnector( + (short) 1, NodeCreator.createOFNode(1000L)); + Map propMap = new HashMap<>(); + swMgr.addNodeConnectors(nc1); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.CHANGED, propMap); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + + swMgr.clear(); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + + // Test NodeConnector notification in the case that there is a related + // edge update just before the notification. + NodeConnector nc2 = NodeConnectorCreator.createOFNodeConnector( + (short) 2, NodeCreator.createOFNode(2000L)); + Edge edge1 = new Edge(nc1, nc2); + Edge edge2 = new Edge(nc2, nc1); + Set props = new HashSet(); + TopoEdgeUpdate teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED); + TopoEdgeUpdate teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED); + List topoedgeupdateList = new ArrayList(); + topoedgeupdateList.add(teu1); + topoedgeupdateList.add(teu2); + topoManagerImpl.edgeUpdate(topoedgeupdateList); + swMgr.addNodeConnectors(nc1); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap); + swMgr.addNodeConnectors(nc2); + topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap); + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); + Assert.assertEquals(2, topoManagerImpl.getEdges().size()); + + teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED); + teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED); + topoedgeupdateList = new ArrayList(); + topoedgeupdateList.add(teu1); + topoedgeupdateList.add(teu2); + topoManagerImpl.edgeUpdate(topoedgeupdateList); + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap); + topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap); + + swMgr.clear(); + + // Test NodeConnector notification in the case that there are multiple + // edge updates related to the NodeConnector just before the notification. + teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED); + teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED); + TopoEdgeUpdate teu3 = new TopoEdgeUpdate(edge1, props, UpdateType.CHANGED); + TopoEdgeUpdate teu4 = new TopoEdgeUpdate(edge2, props, UpdateType.CHANGED); + TopoEdgeUpdate teu5 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED); + TopoEdgeUpdate teu6 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED); + topoedgeupdateList = new ArrayList(); + topoedgeupdateList.add(teu1); + topoedgeupdateList.add(teu2); + topoedgeupdateList.add(teu3); + topoedgeupdateList.add(teu4); + topoedgeupdateList.add(teu5); + topoedgeupdateList.add(teu6); + topoManagerImpl.edgeUpdate(topoedgeupdateList); + swMgr.addNodeConnectors(nc1); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap); + swMgr.addNodeConnectors(nc2); + topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap); + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap); + topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap); + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + } }