import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
import org.opendaylight.controller.switchmanager.ISwitchManager;
import org.opendaylight.controller.topologymanager.ITopologyManager;
import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
IConfigurationContainerAware,
IListenTopoUpdates,
IObjectReader,
+ IInventoryListener,
CommandProvider {
protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
+ private static final long PENDING_UPDATE_TIMEOUT = 5000L;
+
private ITopologyService topoService;
private IClusterContainerServices clusterContainerService;
private IConfigurationContainerService configurationService;
private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
private volatile Boolean shuttingDown = false;
private Thread notifyThread;
+ private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
+ new HashMap<NodeConnector, List<PendingUpdateTask>>();
+ private final BlockingQueue<TopoEdgeUpdate> updateQ =
+ new LinkedBlockingQueue<TopoEdgeUpdate>();
+ private Timer pendingTimer;
+ private Thread updateThread;
+
+ private class PendingEdgeUpdate extends TopoEdgeUpdate {
+ private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
+ super(e, p, t);
+ }
+ }
+
+ private class UpdateTopology implements Runnable {
+ @Override
+ public void run() {
+ log.trace("Start topology update thread");
+
+ while (!shuttingDown) {
+ try {
+ List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
+ TopoEdgeUpdate teu = updateQ.take();
+ for (; teu != null; teu = updateQ.poll()) {
+ list.add(teu);
+ }
+
+ if (!list.isEmpty()) {
+ log.trace("Update edges: {}", list);
+ doEdgeUpdate(list);
+ }
+ } catch (InterruptedException e) {
+ if (shuttingDown) {
+ break;
+ }
+ log.warn("Topology update thread interrupted", e);
+ } catch (Exception e) {
+ log.error("Exception on topology update thread", e);
+ }
+ }
+
+ log.trace("Exit topology update thread");
+ }
+ }
+
+ private class PendingUpdateTask extends TimerTask {
+ private final Edge edge;
+ private final Set<Property> props;
+ private final UpdateType type;
+
+ private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
+ edge = e;
+ props = p;
+ type = t;
+ }
+
+ private NodeConnector getHeadNodeConnector() {
+ return edge.getHeadNodeConnector();
+ }
+
+ private void flush() {
+ log.info("Flush pending topology update: edge {}, type {}",
+ edge, type);
+ updateQ.add(new PendingEdgeUpdate(edge, props, type));
+ }
+ @Override
+ public void run() {
+ if (removePendingEvent(this)) {
+ log.warn("Pending topology update timed out: edge{}, type {}",
+ edge, type);
+ }
+ }
+ }
void nonClusterObjectCreate() {
edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
// Restore the shuttingDown status on init of the component
shuttingDown = false;
notifyThread = new Thread(new TopologyNotify(notifyQ));
+ pendingTimer = new Timer("Topology Pending Update Timer");
+ updateThread = new Thread(new UpdateTopology(), "Topology Update");
}
@SuppressWarnings({ "unchecked" })
*
*/
void started() {
+ updateThread.start();
+
// Start the batcher thread for the cluster wide topology updates
notifyThread.start();
// SollicitRefresh MUST be called here else if called at init
void stop() {
shuttingDown = true;
+ updateThread.interrupt();
notifyThread.interrupt();
+ pendingTimer.cancel();
}
/**
*
*/
void destroy() {
+ updateQ.clear();
+ updateThread = null;
+ pendingTimer = null;
notifyQ.clear();
notifyThread = null;
}
return (switchManager.doesNodeConnectorExist(head));
}
+ private void addPendingEvent(Edge e, Set<Property> p, UpdateType t) {
+ NodeConnector head = e.getHeadNodeConnector();
+ PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list == null) {
+ list = new LinkedList<PendingUpdateTask>();
+ pendingUpdates.put(head, list);
+ }
+ list.add(task);
+ pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+ }
+ }
+
+ private boolean enqueueEventIfPending(Edge e, Set<Property> p, UpdateType t) {
+ NodeConnector head = e.getHeadNodeConnector();
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list != null) {
+ log.warn("Enqueue edge update: edge {}, type {}", e, t);
+ PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+ list.add(task);
+ pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean removePendingEvent(PendingUpdateTask t) {
+ t.cancel();
+ NodeConnector head = t.getHeadNodeConnector();
+ boolean removed = false;
+
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list != null) {
+ removed = list.remove(t);
+ if (list.isEmpty()) {
+ pendingUpdates.remove(head);
+ }
+ }
+ }
+
+ return removed;
+ }
+
+ private void removePendingEvent(NodeConnector head, boolean doFlush) {
+ List<PendingUpdateTask> list;
+ synchronized (pendingUpdates) {
+ list = pendingUpdates.remove(head);
+ }
+
+ if (list != null) {
+ for (PendingUpdateTask task : list) {
+ if (task.cancel() && doFlush) {
+ task.flush();
+ }
+ }
+ pendingTimer.purge();
+ }
+ }
+
private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
- switch (type) {
- case ADDED:
+ return edgeUpdate(e, type, props, false);
+ }
+ private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean isPending) {
+ if (!type.equals(UpdateType.ADDED) &&
+ enqueueEventIfPending(e, props, type)) {
+ return null;
+ }
+ switch (type) {
+ case ADDED:
if (this.edgesDB.containsKey(e)) {
// Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
log.trace("Skipping redundant edge addition: {}", e);
return null;
}
+ // Ensure that head node connector exists
+ if (!isPending) {
+ if (headNodeConnectorExist(e)) {
+ removePendingEvent(e.getHeadNodeConnector(), true);
+ } else {
+ log.warn("Ignore edge that contains invalid node connector: {}",
+ e);
+ addPendingEvent(e, props, type);
+ return null;
+ }
+ }
+
// Make sure the props are non-null or create a copy
if (props == null) {
props = new HashSet<Property>();
props = new HashSet<Property>(props);
}
-
- // Ensure that head node connector exists
- if (!headNodeConnectorExist(e)) {
- log.warn("Ignore edge that contains invalid node connector: {}", e);
- return null;
- }
-
// Check if nodeConnectors of the edge were correctly categorized
// by protocol plugin
crossCheckNodeConnectors(e);
return new TopoEdgeUpdate(e, props, type);
}
- @Override
- public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+ private void doEdgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
- for (int i = 0; i < topoedgeupdateList.size(); i++) {
- Edge e = topoedgeupdateList.get(i).getEdge();
- Set<Property> p = topoedgeupdateList.get(i).getProperty();
- UpdateType type = topoedgeupdateList.get(i).getUpdateType();
- TopoEdgeUpdate teu = edgeUpdate(e, type, p);
- if (teu != null) {
- teuList.add(teu);
+ for (TopoEdgeUpdate teu : topoedgeupdateList) {
+ boolean isPending = (teu instanceof PendingEdgeUpdate);
+ Edge e = teu.getEdge();
+ Set<Property> p = teu.getProperty();
+ UpdateType type = teu.getUpdateType();
+ TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending);
+ if (update != null) {
+ teuList.add(update);
}
}
}
}
+ @Override
+ public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+ updateQ.addAll(topoedgeupdateList);
+ }
+
private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
notifyQ.add(upd);
}
+ @Override
+ public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
+ // NOP
+ }
+
+ @Override
+ public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map<String, Property> propMap) {
+ // Remove pending edge updates for the given node connector.
+ // Pending events should be notified if the node connector exists.
+ boolean doFlush = !type.equals(UpdateType.REMOVED);
+ removePendingEvent(nc, doFlush);
+ }
+
@Override
public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
if (cacheName.equals(TOPOEDGESDB)) {
return result;
}
+ // Only for unit test.
+ void startTest() {
+ pendingTimer = new Timer("Topology Pending Update Timer");
+ updateThread = new Thread(new UpdateTopology(), "Topology Update");
+ updateThread.start();
+ }
+
+ void stopTest() {
+ shuttingDown = true;
+ updateThread.interrupt();
+ pendingTimer.cancel();
+ }
+
+ boolean flushUpdateQueue(long timeout) {
+ long limit = System.currentTimeMillis() + timeout;
+ long cur;
+ do {
+ if (updateQ.peek() == null) {
+ return true;
+ }
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ cur = System.currentTimeMillis();
+ } while (cur < limit);
+
+ return false;
+ }
}
package org.opendaylight.controller.topologymanager.internal;
import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.sal.core.Bandwidth;
import org.opendaylight.controller.sal.core.ConstructionException;
import java.util.concurrent.ConcurrentMap;
public class TopologyManagerImplTest {
+ private TopologyManagerImpl topoManagerImpl;
+
/**
* Mockup of switch manager that only maintains existence of node
* connector.
}
}
+ private void clear() {
+ nodeSet.clear();
+ nodeConnectorSet.clear();
+ }
+
@Override
public Status addSubnet(SubnetConfig configObject) {
return null;
}
}
+ @Before
+ public void setUp() {
+ topoManagerImpl = new TopologyManagerImpl();
+ topoManagerImpl.startTest();
+ }
+
+ @After
+ public void tearDown() {
+ if (topoManagerImpl != null) {
+ topoManagerImpl.stopTest();
+ topoManagerImpl = null;
+ }
+ }
+
/*
* Sets the node, edges and properties for edges here: Edge <SwitchId :
* NodeConnectorId> : <1:1>--><11:11>; <1:2>--><11:12>; <3:3>--><13:13>;
topoedgeupdateList.add(teu2);
topoManagerImpl.edgeUpdate(topoedgeupdateList);
}
+
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
}
@Test
public void testGetNodeEdges() throws ConstructionException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
setNodeEdges(topoManagerImpl, swMgr);
@Test
public void testGetEdges() throws ConstructionException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
setNodeEdges(topoManagerImpl, swMgr);
TopologyUserLinkConfig link4 = new TopologyUserLinkConfig("default20",
"OF|10@OF|20", "OF|10@OF|30");
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
public void testGetUserLink() {
TopologyUserLinkConfig[] link = new TopologyUserLinkConfig[5];
TopologyUserLinkConfig[] reverseLink = new TopologyUserLinkConfig[5];
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
@Test
public void testHostLinkMethods() throws ConstructionException,
UnknownHostException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
@Test
public void testGetNodesWithNodeConnectorHost()
throws ConstructionException, UnknownHostException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
@Test
public void bug1348FixTest() throws ConstructionException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
Assert.fail("Exception was raised when trying to update edge properties: " + e.getMessage());
}
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
Assert.assertEquals(1, topoManagerImpl.getEdges().size());
Assert.assertNotNull(topoManagerImpl.getEdges().get(edge));
}
+
+ @Test
+ public void testNotifyNodeConnector() throws ConstructionException {
+ TestSwitchManager swMgr = new TestSwitchManager();
+ topoManagerImpl.setSwitchManager(swMgr);
+ topoManagerImpl.nonClusterObjectCreate();
+
+ // Test NodeConnector notification in the case that there are no
+ // related edge updates.
+ NodeConnector nc1 = NodeConnectorCreator.createOFNodeConnector(
+ (short) 1, NodeCreator.createOFNode(1000L));
+ Map<String, Property> propMap = new HashMap<>();
+ swMgr.addNodeConnectors(nc1);
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.CHANGED, propMap);
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+ swMgr.clear();
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+ // Test NodeConnector notification in the case that there is a related
+ // edge update just before the notification.
+ NodeConnector nc2 = NodeConnectorCreator.createOFNodeConnector(
+ (short) 2, NodeCreator.createOFNode(2000L));
+ Edge edge1 = new Edge(nc1, nc2);
+ Edge edge2 = new Edge(nc2, nc1);
+ Set<Property> props = new HashSet<Property>();
+ TopoEdgeUpdate teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+ TopoEdgeUpdate teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+ List<TopoEdgeUpdate> topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+ topoedgeupdateList.add(teu1);
+ topoedgeupdateList.add(teu2);
+ topoManagerImpl.edgeUpdate(topoedgeupdateList);
+ swMgr.addNodeConnectors(nc1);
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+ swMgr.addNodeConnectors(nc2);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(2, topoManagerImpl.getEdges().size());
+
+ teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+ teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+ topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+ topoedgeupdateList.add(teu1);
+ topoedgeupdateList.add(teu2);
+ topoManagerImpl.edgeUpdate(topoedgeupdateList);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+
+ swMgr.clear();
+
+ // Test NodeConnector notification in the case that there are multiple
+ // edge updates related to the NodeConnector just before the notification.
+ teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+ teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+ TopoEdgeUpdate teu3 = new TopoEdgeUpdate(edge1, props, UpdateType.CHANGED);
+ TopoEdgeUpdate teu4 = new TopoEdgeUpdate(edge2, props, UpdateType.CHANGED);
+ TopoEdgeUpdate teu5 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+ TopoEdgeUpdate teu6 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+ topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+ topoedgeupdateList.add(teu1);
+ topoedgeupdateList.add(teu2);
+ topoedgeupdateList.add(teu3);
+ topoedgeupdateList.add(teu4);
+ topoedgeupdateList.add(teu5);
+ topoedgeupdateList.add(teu6);
+ topoManagerImpl.edgeUpdate(topoedgeupdateList);
+ swMgr.addNodeConnectors(nc1);
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+ swMgr.addNodeConnectors(nc2);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+ }
}