X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fadsal%2Ftopologymanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Ftopologymanager%2Finternal%2FTopologyManagerImpl.java;h=e1a0ca1e7686c39a8e7563e6f50cb81254fe8d99;hp=659ee7dd81ca83d91c013ceddb7017edca9a8b1b;hb=488cc48063a540a046084b398c72e5c58d2c7288;hpb=dd32d3d246ebad8b7c76afb93239a4462f329a6b diff --git a/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java b/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java index 659ee7dd81..e1a0ca1e76 100644 --- a/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java +++ b/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java @@ -34,6 +34,7 @@ import org.opendaylight.controller.sal.utils.IObjectReader; import org.opendaylight.controller.sal.utils.NodeConnectorCreator; import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; +import org.opendaylight.controller.switchmanager.IInventoryListener; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; @@ -58,6 +59,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -76,6 +79,7 @@ public class TopologyManagerImpl implements IConfigurationContainerAware, IListenTopoUpdates, IObjectReader, + IInventoryListener, CommandProvider { protected static final String TOPOEDGESDB = "topologymanager.edgesDB"; protected static final String TOPOHOSTSDB = "topologymanager.hostsDB"; @@ -83,6 +87,8 @@ public class TopologyManagerImpl implements protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB"; private static final String USER_LINKS_FILE_NAME = "userTopology.conf"; private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class); + private static final long PENDING_UPDATE_TIMEOUT = 5000L; + private ITopologyService topoService; private IClusterContainerServices clusterContainerService; private IConfigurationContainerService configurationService; @@ -104,7 +110,79 @@ public class TopologyManagerImpl implements private BlockingQueue notifyQ = new LinkedBlockingQueue(); private volatile Boolean shuttingDown = false; private Thread notifyThread; + private final Map> pendingUpdates = + new HashMap>(); + private final BlockingQueue updateQ = + new LinkedBlockingQueue(); + private Timer pendingTimer; + private Thread updateThread; + + private class PendingEdgeUpdate extends TopoEdgeUpdate { + private PendingEdgeUpdate(Edge e, Set p, UpdateType t) { + super(e, p, t); + } + } + + private class UpdateTopology implements Runnable { + @Override + public void run() { + log.trace("Start topology update thread"); + + while (!shuttingDown) { + try { + List list = new ArrayList(); + TopoEdgeUpdate teu = updateQ.take(); + for (; teu != null; teu = updateQ.poll()) { + list.add(teu); + } + + if (!list.isEmpty()) { + log.trace("Update edges: {}", list); + doEdgeUpdate(list); + } + } catch (InterruptedException e) { + if (shuttingDown) { + break; + } + log.warn("Topology update thread interrupted", e); + } catch (Exception e) { + log.error("Exception on topology update thread", e); + } + } + + log.trace("Exit topology update thread"); + } + } + + private class PendingUpdateTask extends TimerTask { + private final Edge edge; + private final Set props; + private final UpdateType type; + + private PendingUpdateTask(Edge e, Set p, UpdateType t) { + edge = e; + props = p; + type = t; + } + + private NodeConnector getHeadNodeConnector() { + return edge.getHeadNodeConnector(); + } + + private void flush() { + log.info("Flush pending topology update: edge {}, type {}", + edge, type); + updateQ.add(new PendingEdgeUpdate(edge, props, type)); + } + @Override + public void run() { + if (removePendingEvent(this)) { + log.warn("Pending topology update timed out: edge{}, type {}", + edge, type); + } + } + } void nonClusterObjectCreate() { edgesDB = new ConcurrentHashMap>(); @@ -210,6 +288,8 @@ public class TopologyManagerImpl implements // Restore the shuttingDown status on init of the component shuttingDown = false; notifyThread = new Thread(new TopologyNotify(notifyQ)); + pendingTimer = new Timer("Topology Pending Update Timer"); + updateThread = new Thread(new UpdateTopology(), "Topology Update"); } @SuppressWarnings({ "unchecked" }) @@ -277,6 +357,8 @@ public class TopologyManagerImpl implements * */ void started() { + updateThread.start(); + // Start the batcher thread for the cluster wide topology updates notifyThread.start(); // SollicitRefresh MUST be called here else if called at init @@ -287,7 +369,9 @@ public class TopologyManagerImpl implements void stop() { shuttingDown = true; + updateThread.interrupt(); notifyThread.interrupt(); + pendingTimer.cancel(); } /** @@ -297,6 +381,9 @@ public class TopologyManagerImpl implements * */ void destroy() { + updateQ.clear(); + updateThread = null; + pendingTimer = null; notifyQ.clear(); notifyThread = null; } @@ -571,17 +658,100 @@ public class TopologyManagerImpl implements return (switchManager.doesNodeConnectorExist(head)); } + private void addPendingEvent(Edge e, Set p, UpdateType t) { + NodeConnector head = e.getHeadNodeConnector(); + PendingUpdateTask task = new PendingUpdateTask(e, p, t); + synchronized (pendingUpdates) { + List list = pendingUpdates.get(head); + if (list == null) { + list = new LinkedList(); + pendingUpdates.put(head, list); + } + list.add(task); + pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT); + } + } + + private boolean enqueueEventIfPending(Edge e, Set p, UpdateType t) { + NodeConnector head = e.getHeadNodeConnector(); + synchronized (pendingUpdates) { + List list = pendingUpdates.get(head); + if (list != null) { + log.warn("Enqueue edge update: edge {}, type {}", e, t); + PendingUpdateTask task = new PendingUpdateTask(e, p, t); + list.add(task); + pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT); + return true; + } + } + + return false; + } + + private boolean removePendingEvent(PendingUpdateTask t) { + t.cancel(); + NodeConnector head = t.getHeadNodeConnector(); + boolean removed = false; + + synchronized (pendingUpdates) { + List list = pendingUpdates.get(head); + if (list != null) { + removed = list.remove(t); + if (list.isEmpty()) { + pendingUpdates.remove(head); + } + } + } + + return removed; + } + + private void removePendingEvent(NodeConnector head, boolean doFlush) { + List list; + synchronized (pendingUpdates) { + list = pendingUpdates.remove(head); + } + + if (list != null) { + for (PendingUpdateTask task : list) { + if (task.cancel() && doFlush) { + task.flush(); + } + } + pendingTimer.purge(); + } + } + private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props) { - switch (type) { - case ADDED: + return edgeUpdate(e, type, props, false); + } + private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props, boolean isPending) { + if (!type.equals(UpdateType.ADDED) && + enqueueEventIfPending(e, props, type)) { + return null; + } + switch (type) { + case ADDED: if (this.edgesDB.containsKey(e)) { // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks log.trace("Skipping redundant edge addition: {}", e); return null; } + // Ensure that head node connector exists + if (!isPending) { + if (headNodeConnectorExist(e)) { + removePendingEvent(e.getHeadNodeConnector(), true); + } else { + log.warn("Ignore edge that contains invalid node connector: {}", + e); + addPendingEvent(e, props, type); + return null; + } + } + // Make sure the props are non-null or create a copy if (props == null) { props = new HashSet(); @@ -589,13 +759,6 @@ public class TopologyManagerImpl implements props = new HashSet(props); } - - // Ensure that head node connector exists - if (!headNodeConnectorExist(e)) { - log.warn("Ignore edge that contains invalid node connector: {}", e); - return null; - } - // Check if nodeConnectors of the edge were correctly categorized // by protocol plugin crossCheckNodeConnectors(e); @@ -702,16 +865,16 @@ public class TopologyManagerImpl implements return new TopoEdgeUpdate(e, props, type); } - @Override - public void edgeUpdate(List topoedgeupdateList) { + private void doEdgeUpdate(List topoedgeupdateList) { List teuList = new ArrayList(); - for (int i = 0; i < topoedgeupdateList.size(); i++) { - Edge e = topoedgeupdateList.get(i).getEdge(); - Set p = topoedgeupdateList.get(i).getProperty(); - UpdateType type = topoedgeupdateList.get(i).getUpdateType(); - TopoEdgeUpdate teu = edgeUpdate(e, type, p); - if (teu != null) { - teuList.add(teu); + for (TopoEdgeUpdate teu : topoedgeupdateList) { + boolean isPending = (teu instanceof PendingEdgeUpdate); + Edge e = teu.getEdge(); + Set p = teu.getProperty(); + UpdateType type = teu.getUpdateType(); + TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending); + if (update != null) { + teuList.add(update); } } @@ -727,6 +890,11 @@ public class TopologyManagerImpl implements } } + @Override + public void edgeUpdate(List topoedgeupdateList) { + updateQ.addAll(topoedgeupdateList); + } + private Edge getReverseLinkTuple(TopologyUserLinkConfig link) { TopologyUserLinkConfig rLink = new TopologyUserLinkConfig( link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector()); @@ -934,6 +1102,19 @@ public class TopologyManagerImpl implements notifyQ.add(upd); } + @Override + public void notifyNode(Node node, UpdateType type, Map propMap) { + // NOP + } + + @Override + public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map propMap) { + // Remove pending edge updates for the given node connector. + // Pending events should be notified if the node connector exists. + boolean doFlush = !type.equals(UpdateType.REMOVED); + removePendingEvent(nc, doFlush); + } + @Override public void entryCreated(final Object key, final String cacheName, final boolean originLocal) { if (cacheName.equals(TOPOEDGESDB)) { @@ -1094,4 +1275,35 @@ public class TopologyManagerImpl implements return result; } + // Only for unit test. + void startTest() { + pendingTimer = new Timer("Topology Pending Update Timer"); + updateThread = new Thread(new UpdateTopology(), "Topology Update"); + updateThread.start(); + } + + void stopTest() { + shuttingDown = true; + updateThread.interrupt(); + pendingTimer.cancel(); + } + + boolean flushUpdateQueue(long timeout) { + long limit = System.currentTimeMillis() + timeout; + long cur; + do { + if (updateQ.peek() == null) { + return true; + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + cur = System.currentTimeMillis(); + } while (cur < limit); + + return false; + } }