+ private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
+ TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
+ upd.setLocal(isLocal);
+ notifyQ.add(upd);
+ }
+
+ @Override
+ public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ // This is the case of an Edge being added to the topology DB
+ final Edge e = (Edge) key;
+ log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
+ edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
+ }
+ }
+
+ @Override
+ public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ final Edge e = (Edge) key;
+ log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+ final Set<Property> props = (Set<Property>) new_value;
+ edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
+ }
+ }
+
+ @Override
+ public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ final Edge e = (Edge) key;
+ log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
+ edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
+ }
+ }
+
+ class TopologyNotify implements Runnable {
+ private final BlockingQueue<TopoEdgeUpdate> notifyQ;
+ private TopoEdgeUpdate entry;
+ private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+ private boolean notifyListeners;
+
+ TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
+ this.notifyQ = notifyQ;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ log.trace("New run of TopologyNotify");
+ notifyListeners = false;
+ // First we block waiting for an element to get in
+ entry = notifyQ.take();
+ // Then we drain the whole queue if elements are
+ // in it without getting into any blocking condition
+ for (; entry != null; entry = notifyQ.poll()) {
+ teuList.add(entry);
+ notifyListeners = true;
+ }
+
+ // Notify listeners only if there were updates drained else
+ // give up
+ if (notifyListeners) {
+ log.trace("Notifier thread, notified a listener");
+ // Now update the listeners
+ for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
+ try {
+ s.edgeUpdate(teuList);
+ } catch (Exception exc) {
+ log.error("Exception on edge update:", exc);
+ }
+ }
+ }
+ teuList.clear();
+
+ // Lets sleep for sometime to allow aggregation of event
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ log.warn("TopologyNotify interrupted {}", e1.getMessage());
+ if (shuttingDown) {
+ return;
+ }
+ } catch (Exception e2) {
+ log.error("", e2);
+ }
+ }
+ }
+ }