import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
import org.opendaylight.controller.switchmanager.ISwitchManager;
import org.opendaylight.controller.topologymanager.ITopologyManager;
import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
IConfigurationContainerAware,
IListenTopoUpdates,
IObjectReader,
+ IInventoryListener,
CommandProvider {
protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
+ private static final long PENDING_UPDATE_TIMEOUT = 5000L;
+
private ITopologyService topoService;
private IClusterContainerServices clusterContainerService;
private IConfigurationContainerService configurationService;
private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
private volatile Boolean shuttingDown = false;
private Thread notifyThread;
+ private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
+ new HashMap<NodeConnector, List<PendingUpdateTask>>();
+ private final BlockingQueue<TopoEdgeUpdate> updateQ =
+ new LinkedBlockingQueue<TopoEdgeUpdate>();
+ private Timer pendingTimer;
+ private Thread updateThread;
+
+ private class PendingEdgeUpdate extends TopoEdgeUpdate {
+ private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
+ super(e, p, t);
+ }
+ }
+
+ private class UpdateTopology implements Runnable {
+ @Override
+ public void run() {
+ log.trace("Start topology update thread");
+
+ while (!shuttingDown) {
+ try {
+ List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
+ TopoEdgeUpdate teu = updateQ.take();
+ for (; teu != null; teu = updateQ.poll()) {
+ list.add(teu);
+ }
+
+ if (!list.isEmpty()) {
+ log.trace("Update edges: {}", list);
+ doEdgeUpdate(list);
+ }
+ } catch (InterruptedException e) {
+ if (shuttingDown) {
+ break;
+ }
+ log.warn("Topology update thread interrupted", e);
+ } catch (Exception e) {
+ log.error("Exception on topology update thread", e);
+ }
+ }
+
+ log.trace("Exit topology update thread");
+ }
+ }
+
+ private class PendingUpdateTask extends TimerTask {
+ private final Edge edge;
+ private final Set<Property> props;
+ private final UpdateType type;
+
+ private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
+ edge = e;
+ props = p;
+ type = t;
+ }
+
+ private NodeConnector getHeadNodeConnector() {
+ return edge.getHeadNodeConnector();
+ }
+
+ private void flush() {
+ log.info("Flush pending topology update: edge {}, type {}",
+ edge, type);
+ updateQ.add(new PendingEdgeUpdate(edge, props, type));
+ }
+ @Override
+ public void run() {
+ if (removePendingEvent(this)) {
+ log.warn("Pending topology update timed out: edge{}, type {}",
+ edge, type);
+ }
+ }
+ }
void nonClusterObjectCreate() {
edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
// Restore the shuttingDown status on init of the component
shuttingDown = false;
notifyThread = new Thread(new TopologyNotify(notifyQ));
+ pendingTimer = new Timer("Topology Pending Update Timer");
+ updateThread = new Thread(new UpdateTopology(), "Topology Update");
}
@SuppressWarnings({ "unchecked" })
*
*/
void started() {
+ updateThread.start();
+
// Start the batcher thread for the cluster wide topology updates
notifyThread.start();
// SollicitRefresh MUST be called here else if called at init
void stop() {
shuttingDown = true;
+ updateThread.interrupt();
notifyThread.interrupt();
+ pendingTimer.cancel();
}
/**
*
*/
void destroy() {
+ updateQ.clear();
+ updateThread = null;
+ pendingTimer = null;
notifyQ.clear();
notifyThread = null;
}
return (switchManager.doesNodeConnectorExist(head));
}
+ private void addPendingEvent(Edge e, Set<Property> p, UpdateType t) {
+ NodeConnector head = e.getHeadNodeConnector();
+ PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list == null) {
+ list = new LinkedList<PendingUpdateTask>();
+ pendingUpdates.put(head, list);
+ }
+ list.add(task);
+ pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+ }
+ }
+
+ private boolean enqueueEventIfPending(Edge e, Set<Property> p, UpdateType t) {
+ NodeConnector head = e.getHeadNodeConnector();
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list != null) {
+ log.warn("Enqueue edge update: edge {}, type {}", e, t);
+ PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+ list.add(task);
+ pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean removePendingEvent(PendingUpdateTask t) {
+ t.cancel();
+ NodeConnector head = t.getHeadNodeConnector();
+ boolean removed = false;
+
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list != null) {
+ removed = list.remove(t);
+ if (list.isEmpty()) {
+ pendingUpdates.remove(head);
+ }
+ }
+ }
+
+ return removed;
+ }
+
+ private void removePendingEvent(NodeConnector head, boolean doFlush) {
+ List<PendingUpdateTask> list;
+ synchronized (pendingUpdates) {
+ list = pendingUpdates.remove(head);
+ }
+
+ if (list != null) {
+ for (PendingUpdateTask task : list) {
+ if (task.cancel() && doFlush) {
+ task.flush();
+ }
+ }
+ pendingTimer.purge();
+ }
+ }
+
private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
- switch (type) {
- case ADDED:
+ return edgeUpdate(e, type, props, false);
+ }
+ private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean isPending) {
+ if (!type.equals(UpdateType.ADDED) &&
+ enqueueEventIfPending(e, props, type)) {
+ return null;
+ }
+ switch (type) {
+ case ADDED:
if (this.edgesDB.containsKey(e)) {
// Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
log.trace("Skipping redundant edge addition: {}", e);
return null;
}
+ // Ensure that head node connector exists
+ if (!isPending) {
+ if (headNodeConnectorExist(e)) {
+ removePendingEvent(e.getHeadNodeConnector(), true);
+ } else {
+ log.warn("Ignore edge that contains invalid node connector: {}",
+ e);
+ addPendingEvent(e, props, type);
+ return null;
+ }
+ }
+
// Make sure the props are non-null or create a copy
if (props == null) {
props = new HashSet<Property>();
props = new HashSet<Property>(props);
}
-
- // Ensure that head node connector exists
- if (!headNodeConnectorExist(e)) {
- log.warn("Ignore edge that contains invalid node connector: {}", e);
- return null;
- }
-
// Check if nodeConnectors of the edge were correctly categorized
// by protocol plugin
crossCheckNodeConnectors(e);
return new TopoEdgeUpdate(e, props, type);
}
- @Override
- public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+ private void doEdgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
- for (int i = 0; i < topoedgeupdateList.size(); i++) {
- Edge e = topoedgeupdateList.get(i).getEdge();
- Set<Property> p = topoedgeupdateList.get(i).getProperty();
- UpdateType type = topoedgeupdateList.get(i).getUpdateType();
- TopoEdgeUpdate teu = edgeUpdate(e, type, p);
- if (teu != null) {
- teuList.add(teu);
+ for (TopoEdgeUpdate teu : topoedgeupdateList) {
+ boolean isPending = (teu instanceof PendingEdgeUpdate);
+ Edge e = teu.getEdge();
+ Set<Property> p = teu.getProperty();
+ UpdateType type = teu.getUpdateType();
+ TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending);
+ if (update != null) {
+ teuList.add(update);
}
}
}
}
+ @Override
+ public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+ updateQ.addAll(topoedgeupdateList);
+ }
+
private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
notifyQ.add(upd);
}
+ @Override
+ public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
+ // NOP
+ }
+
+ @Override
+ public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map<String, Property> propMap) {
+ // Remove pending edge updates for the given node connector.
+ // Pending events should be notified if the node connector exists.
+ boolean doFlush = !type.equals(UpdateType.REMOVED);
+ removePendingEvent(nc, doFlush);
+ }
+
@Override
public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
if (cacheName.equals(TOPOEDGESDB)) {
return result;
}
+ // Only for unit test.
+ void startTest() {
+ pendingTimer = new Timer("Topology Pending Update Timer");
+ updateThread = new Thread(new UpdateTopology(), "Topology Update");
+ updateThread.start();
+ }
+
+ void stopTest() {
+ shuttingDown = true;
+ updateThread.interrupt();
+ pendingTimer.cancel();
+ }
+
+ boolean flushUpdateQueue(long timeout) {
+ long limit = System.currentTimeMillis() + timeout;
+ long cur;
+ do {
+ if (updateQ.peek() == null) {
+ return true;
+ }
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ cur = System.currentTimeMillis();
+ } while (cur < limit);
+
+ return false;
+ }
}