+ // Not interested in this update
+ }
+
+ public void unsetIConnectionManager(IConnectionManager s) {
+ if (s == this.connectionManager) {
+ this.connectionManager = null;
+ }
+ }
+
+ public void setIConnectionManager(IConnectionManager s) {
+ this.connectionManager = s;
+ }
+
+ @Override
+ public void entryCreated(Object key, String cacheName, boolean originLocal) {
+ /*
+ * Do nothing
+ */
+ }
+
+ @Override
+ public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
+ if (originLocal) {
+ /*
+ * Local updates are of no interest
+ */
+ return;
+ }
+ if (cacheName.equals(TRIGGERS_CACHE)) {
+ log.trace("Got a trigger for key {} : value {}", key, new_value);
+ final Node n = (Node) new_value;
+ // check if the node is local to this controller
+ ConnectionLocality locality = ConnectionLocality.NOT_LOCAL;
+ if(this.connectionManager != null) {
+ locality = this.connectionManager.getLocalityStatus(n);
+ }
+ if (locality == ConnectionLocality.LOCAL) {
+ log.trace("trigger for node {} processes locally", n);
+ // delete the trigger and proceed with handling the trigger
+ this.triggers.remove(key);
+ // this is a potentially long running task
+ // off load it from the listener thread
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ // the node is local.
+ // call the read service
+ if (reader != null) {
+ List<FlowOnNode> flows = reader.nonCachedReadAllFlows(n);
+ if (flows != null) {
+ flowStatistics.put(n, flows);
+ }
+ }
+ }
+ };
+ // submit the runnable for execution
+ if(this.triggerExecutor != null) {
+ this.triggerExecutor.execute(r);
+ }
+ }
+ } else if (cacheName.equals(FLOW_STATISTICS_CACHE)) {
+ // flow statistics cache updated
+ // get the node
+ log.trace("Got a flow statistics cache update for key {}", key);
+ // this is a short running task
+ // no need of off loading from the listener thread
+ final Node n = (Node) key;
+ // check if an outstanding trigger exists for this node
+ CountDownLatch l = this.latches.get(n);
+ if(l != null) {
+ // someone was waiting for this update
+ // let him know
+ l.countDown();
+ }
+ }
+ }
+
+ @Override
+ public void entryDeleted(Object key, String cacheName, boolean originLocal) {
+ /*
+ * Do nothing
+ */