+
+ private ARP createARP(short opCode, byte[] senderMacAddress, byte[] senderIP, byte[] targetMacAddress, byte[] targetIP) {
+ ARP arp = new ARP();
+ arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
+ arp.setProtocolType(EtherTypes.IPv4.shortValue());
+ arp.setHardwareAddressLength((byte) 6);
+ arp.setProtocolAddressLength((byte) 4);
+ arp.setOpCode(opCode);
+ arp.setSenderHardwareAddress(senderMacAddress) ;
+ arp.setSenderProtocolAddress(senderIP);
+ arp.setTargetHardwareAddress(targetMacAddress);
+ arp.setTargetProtocolAddress(targetIP);
+ return arp;
+ }
+
+ private Ethernet createEthernet(byte[] sourceMAC, byte[] targetMAC, ARP arp) {
+ Ethernet ethernet = new Ethernet();
+ ethernet.setSourceMACAddress(sourceMAC);
+ ethernet.setDestinationMACAddress(targetMAC);
+ ethernet.setEtherType(EtherTypes.ARP.shortValue());
+ ethernet.setPayload(arp);
+ return ethernet;
+ }
+
+ private void startPeriodicTimer() {
+ this.periodicTimer = new Timer("ArpHandler Periodic Timer");
+ this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ Set<InetAddress> targetIPs = countDownTimers.keySet();
+ Set<InetAddress> expiredTargets = new HashSet<InetAddress>();
+ for (InetAddress t : targetIPs) {
+ short tick = countDownTimers.get(t);
+ tick--;
+ if (tick <= 0) {
+ expiredTargets.add(t);
+ } else {
+ countDownTimers.replace(t, tick);
+ }
+ }
+ for (InetAddress tIP : expiredTargets) {
+ countDownTimers.remove(tIP);
+ // Remove the requestor(s) who have been waiting for the ARP
+ // reply from this target for more than 1sec
+ arpRequestors.remove(tIP);
+ log.debug("ARP reply was not received from {}", tIP);
+ }
+
+ // Clean up ARP event cache
+ try {
+ if (clusterContainerService.amICoordinator() && ! arpRequestReplyEvent.isEmpty()){
+ arpRequestReplyEvent.clear();
+ }
+ } catch (Exception e){
+ log.warn("ARPHandler: A cluster member failed to clear event cache.");
+ }
+ }
+ }, 0, 1000);
+ }
+
+ private void cancelPeriodicTimer() {
+ if (this.periodicTimer != null) {
+ this.periodicTimer.cancel();
+ }
+ }
+
+ private void generateAndSendReply(InetAddress sourceIP, byte[] sourceMAC) {
+ if (log.isTraceEnabled()) {
+ log.trace("generateAndSendReply called with params sourceIP:{} sourceMAC:{}", sourceIP,
+ HexEncode.bytesToHexString(sourceMAC));
+ }
+ Set<HostNodeConnector> hosts = arpRequestors.remove(sourceIP);
+ if ((hosts == null) || hosts.isEmpty()) {
+ log.trace("Bailing out no requestors Hosts");
+ return;
+ }
+ countDownTimers.remove(sourceIP);
+ for (HostNodeConnector host : hosts) {
+ if (log.isTraceEnabled()) {
+ log.trace("Sending ARP Reply with src {}/{}, target {}/{}",
+ new Object[] {
+ HexEncode.bytesToHexString(sourceMAC),
+ sourceIP,
+ HexEncode.bytesToHexString(host.getDataLayerAddressBytes()),
+ host.getNetworkAddress() });
+ }
+ if (connectionManager.getLocalityStatus(host.getnodeconnectorNode()) == ConnectionLocality.LOCAL){
+ sendARPReply(host.getnodeConnector(),
+ sourceMAC,
+ sourceIP,
+ host.getDataLayerAddressBytes(),
+ host.getNetworkAddress());
+ } else {
+ /*
+ * In the remote event a requestor moved to another
+ * controller it may turn out it now we need to send
+ * the ARP reply from a different controller, this
+ * cover the case
+ */
+ arpRequestReplyEvent.put(
+ new ARPReply(
+ host.getnodeConnector(),
+ sourceIP,
+ sourceMAC,
+ host.getNetworkAddress(),
+ host.getDataLayerAddressBytes()), false);
+ }
+ }
+ }
+
+
+ @Override
+ public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
+ log.trace("Got and entryUpdated for cacheName {} key {} isNew {}", cacheName, key, new_value);
+ enqueueARPCacheEvent(key, new_value);
+ }
+
+ @Override
+ public void entryCreated(ARPEvent key, String cacheName, boolean originLocal) {
+ // nothing to do
+ }
+ @Override
+ public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) {
+ // nothing to do
+ }
+
+ private void enqueueARPCacheEvent (ARPEvent event, boolean new_value) {
+ try {
+ ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
+ if (!ARPCacheEvents.contains(cacheEvent)) {
+ this.ARPCacheEvents.add(cacheEvent);
+ log.trace("Enqueued {}", event);
+ }
+ } catch (Exception e) {
+ log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
+ }
+ }
+
+ /*
+ * this thread monitors the connectionEvent queue for new incoming events from
+ */
+ private class ARPCacheEventHandler implements Runnable {
+ @Override
+ public void run() {
+ while (!stopping) {
+ try {
+ ARPCacheEvent ev = ARPCacheEvents.take();
+ ARPEvent event = ev.getEvent();
+ if (event instanceof ARPRequest) {
+ ARPRequest req = (ARPRequest) event;
+ // If broadcast request
+ if (req.getHost() == null) {
+ log.trace("Trigger and ARP Broadcast Request upon receipt of {}", req);
+ sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
+
+ //If unicast and local, send reply
+ } else if (connectionManager.getLocalityStatus(req.getHost().getnodeconnectorNode()) == ConnectionLocality.LOCAL) {
+ log.trace("ARPCacheEventHandler - sendUcatARPRequest upon receipt of {}", req);
+ sendUcastARPRequest(req.getHost(), req.getSubnet());
+ }
+ } else if (event instanceof ARPReply) {
+ ARPReply rep = (ARPReply) event;
+ // New reply received by controller, notify all awaiting requestors across the cluster
+ if (ev.isNewReply()) {
+ log.trace("Trigger a generateAndSendReply in response to {}", rep);
+ generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
+ // Otherwise, a specific reply. If local, send out.
+ } else if (connectionManager.getLocalityStatus(rep.getPort().getNode()) == ConnectionLocality.LOCAL) {
+ log.trace("ARPCacheEventHandler - sendUcatARPReply locally in response to {}", rep);
+ sendARPReply(rep.getPort(),
+ rep.getSourceMac(),
+ rep.getSourceIP(),
+ rep.getTargetMac(),
+ rep.getTargetIP());
+ }
+ }
+ } catch (InterruptedException e) {
+ ARPCacheEvents.clear();
+ return;
+ }
+ }
+ }
+ }