Altering ARPHandler and SimpleBroadcastHandler to forward traffic immediately.
[controller.git] / opendaylight / arphandler / src / main / java / org / opendaylight / controller / arphandler / internal / ArpHandler.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /*
11  *
12  */
13 package org.opendaylight.controller.arphandler.internal;
14
15 import java.net.InetAddress;
16 import java.net.UnknownHostException;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.EnumSet;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.opendaylight.controller.arphandler.ARPCacheEvent;
31 import org.opendaylight.controller.arphandler.ARPEvent;
32 import org.opendaylight.controller.arphandler.ARPReply;
33 import org.opendaylight.controller.arphandler.ARPRequest;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.connectionmanager.IConnectionManager;
40 import org.opendaylight.controller.hosttracker.IfHostListener;
41 import org.opendaylight.controller.hosttracker.IfIptoHost;
42 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
43 import org.opendaylight.controller.hosttracker.hostAware.IHostFinder;
44 import org.opendaylight.controller.sal.connection.ConnectionLocality;
45 import org.opendaylight.controller.sal.core.ConstructionException;
46 import org.opendaylight.controller.sal.core.Node;
47 import org.opendaylight.controller.sal.core.NodeConnector;
48 import org.opendaylight.controller.sal.packet.ARP;
49 import org.opendaylight.controller.sal.packet.Ethernet;
50 import org.opendaylight.controller.sal.packet.IDataPacketService;
51 import org.opendaylight.controller.sal.packet.IListenDataPacket;
52 import org.opendaylight.controller.sal.packet.IPv4;
53 import org.opendaylight.controller.sal.packet.Packet;
54 import org.opendaylight.controller.sal.packet.PacketResult;
55 import org.opendaylight.controller.sal.packet.RawPacket;
56 import org.opendaylight.controller.sal.utils.EtherTypes;
57 import org.opendaylight.controller.sal.utils.HexEncode;
58 import org.opendaylight.controller.sal.utils.NetUtils;
59 import org.opendaylight.controller.switchmanager.ISwitchManager;
60 import org.opendaylight.controller.switchmanager.Subnet;
61 import org.opendaylight.controller.topologymanager.ITopologyManager;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateAware<ARPEvent, Boolean> {
66     private static final Logger log = LoggerFactory.getLogger(ArpHandler.class);
67     static final String ARP_EVENT_CACHE_NAME = "arphandler.arpRequestReplyEvent";
68     private IfIptoHost hostTracker;
69     private ISwitchManager switchManager;
70     private ITopologyManager topologyManager;
71     private IDataPacketService dataPacketService;
72     private IClusterContainerServices clusterContainerService;
73     private IConnectionManager connectionManager;
74     private Set<IfHostListener> hostListeners = new CopyOnWriteArraySet<IfHostListener>();
75     private ConcurrentMap<InetAddress, Set<HostNodeConnector>> arpRequestors;
76     private ConcurrentMap<InetAddress, Short> countDownTimers;
77     private Timer periodicTimer;
78     private BlockingQueue<ARPCacheEvent> ARPCacheEvents = new LinkedBlockingQueue<ARPCacheEvent>();
79     private Thread cacheEventHandler;
80     private boolean stopping = false;
81     /*
82      * A cluster allocated cache. Used for synchronizing ARP request/reply
83      * events across all cluster controllers. To raise an event, we put() a specific
84      * event object (as key) and all nodes handle it in the entryUpdated callback.
85      *
86      * In case of ARPReply, we put true value to send replies to any requestors
87      * by calling generateAndSendReply
88      */
89     private ConcurrentMap<ARPEvent, Boolean> arpRequestReplyEvent;
90
91     void setConnectionManager(IConnectionManager cm){
92         this.connectionManager = cm;
93     }
94
95     void unsetConnectionManager(IConnectionManager cm){
96         if (this.connectionManager == cm){
97             connectionManager = null;
98         }
99     }
100
101     void setClusterContainerService(IClusterContainerServices s){
102         this.clusterContainerService = s;
103     }
104
105     void unsetClusterContainerService(IClusterContainerServices s) {
106         if (this.clusterContainerService == s) {
107             this.clusterContainerService = null;
108         }
109     }
110
111     void setHostListener(IfHostListener s) {
112         if (this.hostListeners != null) {
113             this.hostListeners.add(s);
114         }
115     }
116
117     void unsetHostListener(IfHostListener s) {
118         if (this.hostListeners != null) {
119             this.hostListeners.remove(s);
120         }
121     }
122
123     void setDataPacketService(IDataPacketService s) {
124         this.dataPacketService = s;
125     }
126
127     void unsetDataPacketService(IDataPacketService s) {
128         if (this.dataPacketService == s) {
129             this.dataPacketService = null;
130         }
131     }
132
133     public void setHostTracker(IfIptoHost hostTracker) {
134         log.debug("Setting HostTracker");
135         this.hostTracker = hostTracker;
136     }
137
138     public void unsetHostTracker(IfIptoHost s) {
139         log.debug("UNSetting HostTracker");
140         if (this.hostTracker == s) {
141             this.hostTracker = null;
142         }
143     }
144
145     public void setTopologyManager(ITopologyManager tm) {
146         this.topologyManager = tm;
147     }
148
149     public void unsetTopologyManager(ITopologyManager tm) {
150         if (this.topologyManager == tm) {
151             this.topologyManager = null;
152         }
153     }
154
155     protected void sendARPReply(NodeConnector p, byte[] sMAC, InetAddress sIP,
156             byte[] tMAC, InetAddress tIP) {
157         byte[] senderIP = sIP.getAddress();
158         byte[] targetIP = tIP.getAddress();
159         ARP arp = createARP(ARP.REPLY,sMAC,senderIP,tMAC,targetIP);
160
161         Ethernet ethernet = createEthernet(sMAC, tMAC, arp);
162
163         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
164         destPkt.setOutgoingNodeConnector(p);
165
166         this.dataPacketService.transmitDataPacket(destPkt);
167     }
168
169     protected void handleARPPacket(Ethernet eHeader, ARP pkt, NodeConnector p) {
170
171         byte[] sourceMAC = eHeader.getSourceMACAddress();
172         byte[] targetMAC = eHeader.getDestinationMACAddress();
173         /*
174          * Sanity Check; drop ARP packets originated by the controller itself.
175          * This is to avoid continuous flooding
176          */
177         if (Arrays.equals(sourceMAC, getControllerMAC())) {
178             if (log.isDebugEnabled()) {
179                 log.debug("Receive a self originated ARP pkt (srcMAC {}) --> DROP",
180                         HexEncode.bytesToHexString(sourceMAC));
181             }
182             return;
183         }
184
185         InetAddress targetIP, sourceIP;
186         try {
187             targetIP = InetAddress.getByAddress(pkt.getTargetProtocolAddress());
188             sourceIP = InetAddress.getByAddress(pkt.getSenderProtocolAddress());
189         } catch (UnknownHostException e1) {
190             log.debug("Invalid host in ARP packet: {}", e1.getMessage());
191             return;
192         }
193
194         Subnet subnet = null;
195         if (switchManager != null) {
196             subnet = switchManager.getSubnetByNetworkAddress(sourceIP);
197         }
198         if (subnet == null) {
199             log.debug("ARPHandler: can't find subnet matching {}, drop packet", sourceIP);
200             return;
201         }
202
203         // Make sure that the host is a legitimate member of this subnet
204         if (!subnet.hasNodeConnector(p)) {
205             log.debug("{} showing up on {} does not belong to {}",
206                     new Object[] { sourceIP, p, subnet });
207             return;
208         }
209
210         HostNodeConnector requestor = null;
211         if (NetUtils.isUnicastMACAddr(sourceMAC) && p.getNode() != null) {
212             try {
213                 requestor = new HostNodeConnector(sourceMAC, sourceIP, p, subnet.getVlan());
214             } catch (ConstructionException e) {
215                 log.debug("Received ARP packet with invalid MAC: {}", HexEncode.bytesToHexString(sourceMAC));
216                 return;
217             }
218             /*
219              * Learn host from the received ARP REQ/REPLY, inform Host Tracker
220              */
221             log.trace("Inform Host tracker of new host {}", requestor.getNetworkAddress());
222             for (IfHostListener listener : this.hostListeners) {
223                 listener.hostListener(requestor);
224             }
225         }
226
227         /*
228          * OpCode != request -> ARP Reply. If there are hosts (in
229          * arpRequestors) waiting for the ARP reply for this sourceIP, it's
230          * time to generate the reply and send it to these hosts.
231          *
232          * If sourceIP==targetIP, it is a Gratuitous ARP. If there are hosts (in
233          * arpRequestors) waiting for the ARP reply for this sourceIP, it's time
234          * to generate the reply and send it to these hosts
235          */
236
237         if (pkt.getOpCode() != ARP.REQUEST || sourceIP.equals(targetIP)) {
238             // Raise a reply event so that any waiting requestors will be sent a reply
239             // the true value indicates we should generate replies to requestors across the cluster
240             log.trace("Received ARP reply packet from {}, reply to all requestors.", sourceIP);
241             arpRequestReplyEvent.put(new ARPReply(sourceIP, sourceMAC), true);
242             return;
243         }
244
245         /*
246          * ARP Request Handling:
247          * If targetIP is the IP of the subnet, reply with ARP REPLY
248          * If targetIP is a known host, PROXY ARP (by sending ARP REPLY) on behalf of known target hosts.
249          * For unknown target hosts, generate and send an ARP request to ALL switches/ports using
250          * the IP address defined in the subnet as source address
251          */
252         /*
253          * If target IP is gateway IP, Send ARP reply
254          */
255         if ((targetIP.equals(subnet.getNetworkAddress()))
256                 && (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(targetMAC, getControllerMAC()))) {
257             if (connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL){
258                 if (log.isTraceEnabled()){
259                     log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}",
260                             HexEncode.bytesToHexString(getControllerMAC()));
261                 }
262                 sendARPReply(p, getControllerMAC(), targetIP, pkt.getSenderHardwareAddress(), sourceIP);
263             } else {
264                 log.trace("Received non-local ARP req. for default gateway. Raising reply event");
265                 arpRequestReplyEvent.put(
266                         new ARPReply(p, targetIP, getControllerMAC(), sourceIP, pkt.getSenderHardwareAddress()), false);
267             }
268             return;
269         }
270
271
272         HostNodeConnector host = hostTracker.hostQuery(targetIP);
273         // unknown host, initiate ARP request
274         if (host == null) {
275             // add the requestor to the list so that we can replay the reply
276             // when the host responds
277             if (requestor != null) {
278                 Set<HostNodeConnector> requestorSet = arpRequestors.get(targetIP);
279                 if (requestorSet == null) {
280                     requestorSet = Collections.newSetFromMap(new ConcurrentHashMap<HostNodeConnector, Boolean>());
281                     arpRequestors.put(targetIP, requestorSet);
282                 }
283                 requestorSet.add(requestor);
284                 countDownTimers.put(targetIP, (short) 2); // reset timeout to 2sec
285             }
286             //Raise a bcast request event, all controllers need to send one
287             log.trace("Sending a bcast ARP request for {}", targetIP);
288             arpRequestReplyEvent.put(new ARPRequest(targetIP, subnet), false);
289         } else {
290             /*
291              * Target host known (across the cluster), send ARP REPLY make sure that targetMAC
292              * matches the host's MAC if it is not broadcastMAC
293              */
294             if (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(host.getDataLayerAddressBytes(), targetMAC)) {
295                 log.trace("Received ARP req. for known host {}, sending reply...", targetIP);
296                 if (connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL) {
297                     sendARPReply(p,
298                             host.getDataLayerAddressBytes(),
299                             host.getNetworkAddress(),
300                             pkt.getSenderHardwareAddress(),
301                             sourceIP);
302                 } else {
303                     arpRequestReplyEvent.put(new ARPReply(
304                             p,
305                             host.getNetworkAddress(),
306                             host.getDataLayerAddressBytes(),
307                             sourceIP,
308                             pkt.getSenderHardwareAddress()), false);
309                 }
310             } else {
311                 /*
312                  * Target MAC has been changed. For now, discard it.
313                  * TODO: We may need to send unicast ARP REQUEST on behalf of
314                  * the target back to the sender to trigger the sender to update
315                  * its table
316                  */
317             }
318         }
319     }
320
321     /**
322      *  Send a broadcast ARP Request to the switch/ ports  using
323      *  the networkAddress of the subnet as sender IP
324      *  the controller's MAC as sender MAC
325      *  the targetIP as the target Network Address
326      */
327     protected void sendBcastARPRequest(InetAddress targetIP, Subnet subnet) {
328         log.trace("sendBcatARPRequest targetIP:{} subnet:{}", targetIP, subnet);
329         Set<NodeConnector> nodeConnectors;
330         if (subnet.isFlatLayer2()) {
331             nodeConnectors = new HashSet<NodeConnector>();
332             for (Node n : this.switchManager.getNodes()) {
333                 nodeConnectors.addAll(this.switchManager.getUpNodeConnectors(n));
334             }
335         } else {
336             nodeConnectors = subnet.getNodeConnectors();
337         }
338         byte[] targetHardwareAddress = new byte[] { (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 };
339
340         //TODO: should use IBroadcastHandler instead
341         for (NodeConnector p : nodeConnectors) {
342             //filter out any non-local or internal ports
343             if (!(connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL) || topologyManager.isInternal(p)) {
344                 continue;
345             }
346             log.trace("Sending toward nodeConnector:{}", p);
347             byte[] senderIP = subnet.getNetworkAddress().getAddress();
348             byte[] targetIPByte = targetIP.getAddress();
349             ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetHardwareAddress, targetIPByte);
350
351             byte[] destMACAddress = NetUtils.getBroadcastMACAddr();
352             Ethernet ethernet = createEthernet(getControllerMAC(), destMACAddress, arp);
353
354             // TODO For now send port-by-port, see how to optimize to
355             // send to multiple ports at once
356             RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
357             destPkt.setOutgoingNodeConnector(p);
358
359             this.dataPacketService.transmitDataPacket(destPkt);
360         }
361     }
362
363     /**
364      * Send a unicast ARP Request to the known host on a specific switch/port as
365      * defined in the host.
366      * The sender IP is the networkAddress of the subnet
367      * The sender MAC is the controller's MAC
368      */
369     protected void sendUcastARPRequest(HostNodeConnector host, Subnet subnet) {
370         log.trace("sendUcastARPRequest host:{} subnet:{}", host, subnet);
371         NodeConnector outPort = host.getnodeConnector();
372         if (outPort == null) {
373             log.error("Failed sending UcastARP because cannot extract output port from Host: {}", host);
374             return;
375         }
376
377         byte[] senderIP = subnet.getNetworkAddress().getAddress();
378         byte[] targetIP = host.getNetworkAddress().getAddress();
379         byte[] targetMAC = host.getDataLayerAddressBytes();
380         ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetMAC, targetIP);
381
382         Ethernet ethernet = createEthernet(getControllerMAC(), targetMAC, arp);
383
384         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
385         destPkt.setOutgoingNodeConnector(outPort);
386
387         this.dataPacketService.transmitDataPacket(destPkt);
388     }
389
390     @Override
391     public void find(InetAddress networkAddress) {
392         log.trace("Received find IP {}", networkAddress);
393
394         Subnet subnet = null;
395         if (switchManager != null) {
396             subnet = switchManager.getSubnetByNetworkAddress(networkAddress);
397         }
398         if (subnet == null) {
399             log.debug("Can't find subnet matching IP {}", networkAddress);
400             return;
401         }
402
403         // send a broadcast ARP Request to this IP
404         arpRequestReplyEvent.put(new ARPRequest(networkAddress, subnet), false);
405     }
406
407     /*
408      * Probe the host by sending a unicast ARP Request to the host
409      */
410     @Override
411     public void probe(HostNodeConnector host) {
412         log.trace("Received probe host {}", host);
413
414         Subnet subnet = null;
415         if (switchManager != null) {
416             subnet = switchManager.getSubnetByNetworkAddress(host
417                     .getNetworkAddress());
418         }
419         if (subnet == null) {
420             log.debug("can't find subnet matching {}", host.getNetworkAddress());
421             return;
422         }
423
424         if (connectionManager.getLocalityStatus(host.getnodeconnectorNode()) == ConnectionLocality.LOCAL){
425             log.trace("Send a ucast ARP req. to: {}", host);
426             sendUcastARPRequest(host, subnet);
427         } else {
428             log.trace("Raise a ucast ARP req. event to: {}", host);
429             arpRequestReplyEvent.put(new ARPRequest(host, subnet), false);
430         }
431     }
432
433     /**
434      * An IP packet is punted to the controller, this means that the
435      * destination host is not known to the controller.
436      * Need to discover it by sending a Broadcast ARP Request
437      *
438      * @param pkt
439      * @param p
440      */
441     protected void handlePuntedIPPacket(IPv4 pkt, NodeConnector p) {
442
443         InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress());
444         if (dIP == null) {
445            return;
446         }
447
448         // try to find a matching subnet
449         Subnet subnet = null;
450         if (switchManager != null) {
451             subnet = switchManager.getSubnetByNetworkAddress(dIP);
452         }
453         if (subnet == null) {
454             log.debug("Can't find subnet matching {}, drop packet", dIP);
455             return;
456         }
457
458         // see if we know about the host
459         HostNodeConnector host = hostTracker.hostFind(dIP);
460
461         if (host == null) {
462             // if we don't, know about the host, try to find it
463             log.trace("Punted IP pkt from {}, sending bcast ARP event...",
464                       dIP);
465             /*
466              * unknown destination host, initiate bcast ARP request
467              */
468             arpRequestReplyEvent.put(new ARPRequest(dIP, subnet), false);
469         }else{
470
471             // we know about the host, send the packet the right place
472             NodeConnector nc = host.getnodeConnector();
473
474             // re-encode the Ethernet packet (the parent of the IPv4 packet)
475             RawPacket rp = this.dataPacketService.encodeDataPacket(pkt.getParent());
476             rp.setOutgoingNodeConnector(nc);
477             this.dataPacketService.transmitDataPacket(rp);
478         }
479     }
480
481     public byte[] getControllerMAC() {
482         if (switchManager == null) {
483             return null;
484         }
485         return switchManager.getControllerMAC();
486     }
487
488     /**
489      * Function called by the dependency manager when all the required
490      * dependencies are satisfied
491      *
492      */
493     void init() {
494         arpRequestors = new ConcurrentHashMap<InetAddress, Set<HostNodeConnector>>();
495         countDownTimers = new ConcurrentHashMap<InetAddress, Short>();
496         cacheEventHandler = new Thread(new ARPCacheEventHandler(), "ARPCacheEventHandler Thread");
497
498         allocateCaches();
499         retrieveCaches();
500     }
501
502     @SuppressWarnings({ "unchecked" })
503     private void retrieveCaches() {
504         ConcurrentMap<?,?> map;
505
506         if (this.clusterContainerService == null){
507             log.error("Cluster service unavailable, can't retieve ARPHandler caches!");
508             return;
509         }
510
511         map = clusterContainerService.getCache(ARP_EVENT_CACHE_NAME);
512         if (map != null){
513             this.arpRequestReplyEvent = (ConcurrentMap<ARPEvent, Boolean>) map;
514         } else {
515             log.error("Cache allocation failed for {}", ARP_EVENT_CACHE_NAME);
516         }
517     }
518
519     private void allocateCaches() {
520         if (clusterContainerService == null){
521             nonClusterObjectCreate();
522             log.error("Clustering service unavailable. Allocated non-cluster caches for ARPHandler.");
523             return;
524         }
525
526         try{
527             clusterContainerService.createCache(ARP_EVENT_CACHE_NAME,
528                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
529         } catch (CacheConfigException e){
530             log.error("ARPHandler cache configuration invalid!");
531         } catch (CacheExistException e){
532             log.debug("ARPHandler cache exists, skipped allocation.");
533         }
534
535     }
536
537     private void nonClusterObjectCreate(){
538         arpRequestReplyEvent = new ConcurrentHashMap<ARPEvent, Boolean>();
539     }
540     /**
541      * Function called by the dependency manager when at least one
542      * dependency become unsatisfied or when the component is shutting
543      * down because for example bundle is being stopped.
544      *
545      */
546     void destroy() {
547         cacheEventHandler.interrupt();
548     }
549
550     /**
551      * Function called by dependency manager after "init ()" is called
552      * and after the services provided by the class are registered in
553      * the service registry
554      *
555      */
556     void start() {
557         stopping = false;
558         startPeriodicTimer();
559         cacheEventHandler.start();
560     }
561
562     /**
563      * Function called by the dependency manager before the services
564      * exported by the component are unregistered, this will be
565      * followed by a "destroy ()" calls
566      *
567      */
568     void stop(){
569     }
570
571     void stopping() {
572         stopping = true;
573         cancelPeriodicTimer();
574     }
575
576     void setSwitchManager(ISwitchManager s) {
577         log.debug("SwitchManager service set.");
578         this.switchManager = s;
579     }
580
581     void unsetSwitchManager(ISwitchManager s) {
582         if (this.switchManager == s) {
583             log.debug("SwitchManager service UNset.");
584             this.switchManager = null;
585         }
586     }
587
588     @Override
589     public PacketResult receiveDataPacket(RawPacket inPkt) {
590         if (inPkt == null) {
591             return PacketResult.IGNORED;
592         }
593         log.trace("Received a frame of size: {}", inPkt.getPacketData().length);
594         Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
595         if (formattedPak instanceof Ethernet) {
596             Object nextPak = formattedPak.getPayload();
597             if (nextPak instanceof IPv4) {
598                 log.trace("Handle IP packet: {}", formattedPak);
599                 handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector());
600             } else if (nextPak instanceof ARP) {
601                 log.trace("Handle ARP packet: {}", formattedPak);
602                 handleARPPacket((Ethernet) formattedPak, (ARP) nextPak, inPkt
603                         .getIncomingNodeConnector());
604             }
605         }
606         return PacketResult.IGNORED;
607     }
608
609     private ARP createARP(short opCode, byte[] senderMacAddress, byte[] senderIP, byte[] targetMacAddress, byte[] targetIP) {
610             ARP arp = new ARP();
611             arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
612             arp.setProtocolType(EtherTypes.IPv4.shortValue());
613             arp.setHardwareAddressLength((byte) 6);
614             arp.setProtocolAddressLength((byte) 4);
615             arp.setOpCode(opCode);
616             arp.setSenderHardwareAddress(senderMacAddress) ;
617             arp.setSenderProtocolAddress(senderIP);
618             arp.setTargetHardwareAddress(targetMacAddress);
619             arp.setTargetProtocolAddress(targetIP);
620             return arp;
621     }
622
623     private Ethernet createEthernet(byte[] sourceMAC, byte[] targetMAC, ARP arp) {
624         Ethernet ethernet = new Ethernet();
625         ethernet.setSourceMACAddress(sourceMAC);
626         ethernet.setDestinationMACAddress(targetMAC);
627         ethernet.setEtherType(EtherTypes.ARP.shortValue());
628         ethernet.setPayload(arp);
629         return ethernet;
630     }
631
632     private void startPeriodicTimer() {
633         this.periodicTimer = new Timer("ArpHandler Periodic Timer");
634         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
635             @Override
636             public void run() {
637                 Set<InetAddress> targetIPs = countDownTimers.keySet();
638                 Set<InetAddress> expiredTargets = new HashSet<InetAddress>();
639                 for (InetAddress t : targetIPs) {
640                     short tick = countDownTimers.get(t);
641                     tick--;
642                     if (tick <= 0) {
643                         expiredTargets.add(t);
644                     } else {
645                         countDownTimers.replace(t, tick);
646                     }
647                 }
648                 for (InetAddress tIP : expiredTargets) {
649                     countDownTimers.remove(tIP);
650                     // Remove the requestor(s) who have been waiting for the ARP
651                     // reply from this target for more than 1sec
652                     arpRequestors.remove(tIP);
653                     log.debug("ARP reply was not received from {}", tIP);
654                 }
655
656                 // Clean up ARP event cache
657                 try {
658                     if (clusterContainerService.amICoordinator() && ! arpRequestReplyEvent.isEmpty()){
659                         arpRequestReplyEvent.clear();
660                     }
661                 } catch (Exception e){
662                     log.warn("ARPHandler: A cluster member failed to clear event cache.");
663                 }
664             }
665         }, 0, 1000);
666     }
667
668     private void cancelPeriodicTimer() {
669         if (this.periodicTimer != null) {
670             this.periodicTimer.cancel();
671         }
672     }
673
674     private void generateAndSendReply(InetAddress sourceIP, byte[] sourceMAC) {
675         if (log.isTraceEnabled()) {
676             log.trace("generateAndSendReply called with params sourceIP:{} sourceMAC:{}", sourceIP,
677                       HexEncode.bytesToHexString(sourceMAC));
678         }
679         Set<HostNodeConnector> hosts = arpRequestors.remove(sourceIP);
680         if ((hosts == null) || hosts.isEmpty()) {
681             log.trace("Bailing out no requestors Hosts");
682             return;
683         }
684         countDownTimers.remove(sourceIP);
685         for (HostNodeConnector host : hosts) {
686             if (log.isTraceEnabled()) {
687                 log.trace("Sending ARP Reply with src {}/{}, target {}/{}",
688                           new Object[] {
689                               HexEncode.bytesToHexString(sourceMAC),
690                               sourceIP,
691                               HexEncode.bytesToHexString(host.getDataLayerAddressBytes()),
692                               host.getNetworkAddress() });
693             }
694             if (connectionManager.getLocalityStatus(host.getnodeconnectorNode()) == ConnectionLocality.LOCAL){
695                 sendARPReply(host.getnodeConnector(),
696                         sourceMAC,
697                         sourceIP,
698                         host.getDataLayerAddressBytes(),
699                         host.getNetworkAddress());
700             } else {
701                 /*
702                  * In the remote event a requestor moved to another
703                  * controller it may turn out it now we need to send
704                  * the ARP reply from a different controller, this
705                  * cover the case
706                  */
707                 arpRequestReplyEvent.put(
708                         new ARPReply(
709                             host.getnodeConnector(),
710                             sourceIP,
711                             sourceMAC,
712                             host.getNetworkAddress(),
713                             host.getDataLayerAddressBytes()), false);
714             }
715         }
716     }
717
718
719     @Override
720     public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
721         log.trace("Got and entryUpdated for cacheName {} key {} isNew {}", cacheName, key, new_value);
722         enqueueARPCacheEvent(key, new_value);
723     }
724
725     @Override
726     public void entryCreated(ARPEvent key, String cacheName, boolean originLocal) {
727         // nothing to do
728     }
729     @Override
730     public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) {
731         // nothing to do
732     }
733
734     private void enqueueARPCacheEvent (ARPEvent event, boolean new_value) {
735         try {
736             ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
737             if (!ARPCacheEvents.contains(cacheEvent)) {
738                 this.ARPCacheEvents.add(cacheEvent);
739                 log.trace("Enqueued {}", event);
740             }
741         } catch (Exception e) {
742             log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
743         }
744     }
745
746     /*
747      * this thread monitors the connectionEvent queue for new incoming events from
748      */
749     private class ARPCacheEventHandler implements Runnable {
750         @Override
751         public void run() {
752             while (!stopping) {
753                 try {
754                     ARPCacheEvent ev = ARPCacheEvents.take();
755                     ARPEvent event = ev.getEvent();
756                     if (event instanceof ARPRequest) {
757                         ARPRequest req = (ARPRequest) event;
758                         // If broadcast request
759                         if (req.getHost() == null) {
760                             log.trace("Trigger and ARP Broadcast Request upon receipt of {}", req);
761                             sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
762
763                         //If unicast and local, send reply
764                         } else if (connectionManager.getLocalityStatus(req.getHost().getnodeconnectorNode()) == ConnectionLocality.LOCAL) {
765                             log.trace("ARPCacheEventHandler - sendUcatARPRequest upon receipt of {}", req);
766                             sendUcastARPRequest(req.getHost(), req.getSubnet());
767                         }
768                     } else if (event instanceof ARPReply) {
769                         ARPReply rep = (ARPReply) event;
770                         // New reply received by controller, notify all awaiting requestors across the cluster
771                         if (ev.isNewReply()) {
772                             log.trace("Trigger a generateAndSendReply in response to {}", rep);
773                             generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
774                         // Otherwise, a specific reply. If local, send out.
775                         } else if (connectionManager.getLocalityStatus(rep.getPort().getNode()) == ConnectionLocality.LOCAL) {
776                             log.trace("ARPCacheEventHandler - sendUcatARPReply locally in response to {}", rep);
777                             sendARPReply(rep.getPort(),
778                                     rep.getSourceMac(),
779                                     rep.getSourceIP(),
780                                     rep.getTargetMac(),
781                                     rep.getTargetIP());
782                         }
783                     }
784                 } catch (InterruptedException e) {
785                     ARPCacheEvents.clear();
786                     return;
787                 }
788             }
789         }
790     }
791 }