+
+ private void enqueueARPCacheEvent(ARPEvent event, boolean new_value) {
+ try {
+ ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
+ if (!ARPCacheEvents.contains(cacheEvent)) {
+ this.ARPCacheEvents.add(cacheEvent);
+ log.trace("Enqueued {}", event);
+ }
+ } catch (Exception e) {
+ log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
+ }
+ }
+
+ /*
+ * this thread monitors the connectionEvent queue for new incoming events
+ * from
+ */
+ private class ARPCacheEventHandler implements Runnable {
+ @Override
+ public void run() {
+ while (!stopping) {
+ try {
+ ARPCacheEvent ev = ARPCacheEvents.take();
+ ARPEvent event = ev.getEvent();
+ if (event instanceof ARPRequest) {
+ ARPRequest req = (ARPRequest) event;
+ // If broadcast request
+ if (req.getHost() == null) {
+ log.trace("Trigger and ARP Broadcast Request upon receipt of {}", req);
+ sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
+
+ // If unicast and local, send reply
+ } else if (connectionManager.getLocalityStatus(req.getHost().getnodeconnectorNode()) == ConnectionLocality.LOCAL) {
+ log.trace("ARPCacheEventHandler - sendUcatARPRequest upon receipt of {}", req);
+ sendUcastARPRequest(req.getHost(), req.getSubnet());
+ }
+ } else if (event instanceof ARPReply) {
+ ARPReply rep = (ARPReply) event;
+ // New reply received by controller, notify all awaiting
+ // requestors across the cluster
+ if (ev.isNewReply()) {
+ log.trace("Trigger a generateAndSendReply in response to {}", rep);
+ generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
+ // Otherwise, a specific reply. If local, send out.
+ } else if (connectionManager.getLocalityStatus(rep.getPort().getNode()) == ConnectionLocality.LOCAL) {
+ log.trace("ARPCacheEventHandler - sendUcatARPReply locally in response to {}", rep);
+ sendARPReply(rep.getPort(), rep.getSourceMac(), rep.getSourceIP(), rep.getTargetMac(),
+ rep.getTargetIP());
+ }
+ }
+ } catch (InterruptedException e) {
+ ARPCacheEvents.clear();
+ return;
+ }
+ }
+ }
+ }