+ private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
+ new HashMap<NodeConnector, List<PendingUpdateTask>>();
+ private final BlockingQueue<TopoEdgeUpdate> updateQ =
+ new LinkedBlockingQueue<TopoEdgeUpdate>();
+ private Timer pendingTimer;
+ private Thread updateThread;
+
+ private class PendingEdgeUpdate extends TopoEdgeUpdate {
+ private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
+ super(e, p, t);
+ }
+ }
+
+ private class UpdateTopology implements Runnable {
+ @Override
+ public void run() {
+ log.trace("Start topology update thread");
+
+ while (!shuttingDown) {
+ try {
+ List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
+ TopoEdgeUpdate teu = updateQ.take();
+ for (; teu != null; teu = updateQ.poll()) {
+ list.add(teu);
+ }
+
+ if (!list.isEmpty()) {
+ log.trace("Update edges: {}", list);
+ doEdgeUpdate(list);
+ }
+ } catch (InterruptedException e) {
+ if (shuttingDown) {
+ break;
+ }
+ log.warn("Topology update thread interrupted", e);
+ } catch (Exception e) {
+ log.error("Exception on topology update thread", e);
+ }
+ }
+
+ log.trace("Exit topology update thread");
+ }
+ }
+
+ private class PendingUpdateTask extends TimerTask {
+ private final Edge edge;
+ private final Set<Property> props;
+ private final UpdateType type;
+
+ private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
+ edge = e;
+ props = p;
+ type = t;
+ }
+
+ private NodeConnector getHeadNodeConnector() {
+ return edge.getHeadNodeConnector();
+ }
+
+ private void flush() {
+ log.info("Flush pending topology update: edge {}, type {}",
+ edge, type);
+ updateQ.add(new PendingEdgeUpdate(edge, props, type));
+ }