Update context-instance xpath format according to latest changes in config-api yang.
[controller.git] / opendaylight / arphandler / src / main / java / org / opendaylight / controller / arphandler / internal / ArpHandler.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /*
11  *
12  */
13 package org.opendaylight.controller.arphandler.internal;
14
15 import java.net.InetAddress;
16 import java.net.UnknownHostException;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.EnumSet;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.opendaylight.controller.arphandler.ARPCacheEvent;
31 import org.opendaylight.controller.arphandler.ARPEvent;
32 import org.opendaylight.controller.arphandler.ARPReply;
33 import org.opendaylight.controller.arphandler.ARPRequest;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.connectionmanager.IConnectionManager;
40 import org.opendaylight.controller.hosttracker.IfHostListener;
41 import org.opendaylight.controller.hosttracker.IfIptoHost;
42 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
43 import org.opendaylight.controller.hosttracker.hostAware.IHostFinder;
44 import org.opendaylight.controller.sal.connection.ConnectionLocality;
45 import org.opendaylight.controller.sal.core.ConstructionException;
46 import org.opendaylight.controller.sal.core.Node;
47 import org.opendaylight.controller.sal.core.NodeConnector;
48 import org.opendaylight.controller.sal.packet.ARP;
49 import org.opendaylight.controller.sal.packet.Ethernet;
50 import org.opendaylight.controller.sal.packet.IDataPacketService;
51 import org.opendaylight.controller.sal.packet.IListenDataPacket;
52 import org.opendaylight.controller.sal.packet.IPv4;
53 import org.opendaylight.controller.sal.packet.Packet;
54 import org.opendaylight.controller.sal.packet.PacketResult;
55 import org.opendaylight.controller.sal.packet.RawPacket;
56 import org.opendaylight.controller.sal.routing.IRouting;
57 import org.opendaylight.controller.sal.utils.EtherTypes;
58 import org.opendaylight.controller.sal.utils.HexEncode;
59 import org.opendaylight.controller.sal.utils.NetUtils;
60 import org.opendaylight.controller.switchmanager.ISwitchManager;
61 import org.opendaylight.controller.switchmanager.Subnet;
62 import org.opendaylight.controller.topologymanager.ITopologyManager;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateAware<ARPEvent, Boolean> {
67     private static final Logger log = LoggerFactory.getLogger(ArpHandler.class);
68     static final String ARP_EVENT_CACHE_NAME = "arphandler.arpRequestReplyEvent";
69     private IfIptoHost hostTracker;
70     private ISwitchManager switchManager;
71     private ITopologyManager topologyManager;
72     private IDataPacketService dataPacketService;
73     private IRouting routing;
74     private IClusterContainerServices clusterContainerService;
75     private IConnectionManager connectionManager;
76     private Set<IfHostListener> hostListeners = new CopyOnWriteArraySet<IfHostListener>();
77     private ConcurrentMap<InetAddress, Set<HostNodeConnector>> arpRequestors;
78     private ConcurrentMap<InetAddress, Short> countDownTimers;
79     private Timer periodicTimer;
80     private BlockingQueue<ARPCacheEvent> ARPCacheEvents = new LinkedBlockingQueue<ARPCacheEvent>();
81     private Thread cacheEventHandler;
82     private boolean stopping = false;
83     /*
84      * A cluster allocated cache. Used for synchronizing ARP request/reply
85      * events across all cluster controllers. To raise an event, we put() a specific
86      * event object (as key) and all nodes handle it in the entryUpdated callback.
87      *
88      * In case of ARPReply, we put true value to send replies to any requestors
89      * by calling generateAndSendReply
90      */
91     private ConcurrentMap<ARPEvent, Boolean> arpRequestReplyEvent;
92
93     void setConnectionManager(IConnectionManager cm){
94         this.connectionManager = cm;
95     }
96
97     void unsetConnectionManager(IConnectionManager cm){
98         if (this.connectionManager == cm){
99             connectionManager = null;
100         }
101     }
102
103     void setClusterContainerService(IClusterContainerServices s){
104         this.clusterContainerService = s;
105     }
106
107     void unsetClusterContainerService(IClusterContainerServices s) {
108         if (this.clusterContainerService == s) {
109             this.clusterContainerService = null;
110         }
111     }
112
113     void setRouting(IRouting r) {
114         this.routing = r;
115     }
116
117     void unsetRouting(IRouting r) {
118         if (this.routing == r) {
119             this.routing = null;
120         }
121     }
122
123     void setHostListener(IfHostListener s) {
124         if (this.hostListeners != null) {
125             this.hostListeners.add(s);
126         }
127     }
128
129     void unsetHostListener(IfHostListener s) {
130         if (this.hostListeners != null) {
131             this.hostListeners.remove(s);
132         }
133     }
134
135     void setDataPacketService(IDataPacketService s) {
136         this.dataPacketService = s;
137     }
138
139     void unsetDataPacketService(IDataPacketService s) {
140         if (this.dataPacketService == s) {
141             this.dataPacketService = null;
142         }
143     }
144
145     public void setHostTracker(IfIptoHost hostTracker) {
146         log.debug("Setting HostTracker");
147         this.hostTracker = hostTracker;
148     }
149
150     public void unsetHostTracker(IfIptoHost s) {
151         log.debug("UNSetting HostTracker");
152         if (this.hostTracker == s) {
153             this.hostTracker = null;
154         }
155     }
156
157     public void setTopologyManager(ITopologyManager tm) {
158         this.topologyManager = tm;
159     }
160
161     public void unsetTopologyManager(ITopologyManager tm) {
162         if (this.topologyManager == tm) {
163             this.topologyManager = null;
164         }
165     }
166
167     protected void sendARPReply(NodeConnector p, byte[] sMAC, InetAddress sIP,
168             byte[] tMAC, InetAddress tIP) {
169         byte[] senderIP = sIP.getAddress();
170         byte[] targetIP = tIP.getAddress();
171         ARP arp = createARP(ARP.REPLY,sMAC,senderIP,tMAC,targetIP);
172
173         Ethernet ethernet = createEthernet(sMAC, tMAC, arp);
174
175         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
176         destPkt.setOutgoingNodeConnector(p);
177
178         this.dataPacketService.transmitDataPacket(destPkt);
179     }
180
181     protected void handleARPPacket(Ethernet eHeader, ARP pkt, NodeConnector p) {
182
183         byte[] sourceMAC = eHeader.getSourceMACAddress();
184         byte[] targetMAC = eHeader.getDestinationMACAddress();
185         /*
186          * Sanity Check; drop ARP packets originated by the controller itself.
187          * This is to avoid continuous flooding
188          */
189         if (Arrays.equals(sourceMAC, getControllerMAC())) {
190             if (log.isDebugEnabled()) {
191                 log.debug("Receive a self originated ARP pkt (srcMAC {}) --> DROP",
192                         HexEncode.bytesToHexString(sourceMAC));
193             }
194             return;
195         }
196
197         InetAddress targetIP, sourceIP;
198         try {
199             targetIP = InetAddress.getByAddress(pkt.getTargetProtocolAddress());
200             sourceIP = InetAddress.getByAddress(pkt.getSenderProtocolAddress());
201         } catch (UnknownHostException e1) {
202             log.debug("Invalid host in ARP packet: {}", e1.getMessage());
203             return;
204         }
205
206         Subnet subnet = null;
207         if (switchManager != null) {
208             subnet = switchManager.getSubnetByNetworkAddress(sourceIP);
209         }
210         if (subnet == null) {
211             log.debug("ARPHandler: can't find subnet matching {}, drop packet", sourceIP);
212             return;
213         }
214
215         // Make sure that the host is a legitimate member of this subnet
216         if (!subnet.hasNodeConnector(p)) {
217             log.debug("{} showing up on {} does not belong to {}",
218                     new Object[] { sourceIP, p, subnet });
219             return;
220         }
221
222         HostNodeConnector requestor = null;
223         if (NetUtils.isUnicastMACAddr(sourceMAC) && p.getNode() != null) {
224             try {
225                 requestor = new HostNodeConnector(sourceMAC, sourceIP, p, subnet.getVlan());
226             } catch (ConstructionException e) {
227                 log.debug("Received ARP packet with invalid MAC: {}", HexEncode.bytesToHexString(sourceMAC));
228                 return;
229             }
230             /*
231              * Learn host from the received ARP REQ/REPLY, inform Host Tracker
232              */
233             log.trace("Inform Host tracker of new host {}", requestor.getNetworkAddress());
234             for (IfHostListener listener : this.hostListeners) {
235                 listener.hostListener(requestor);
236             }
237         }
238
239         /*
240          * OpCode != request -> ARP Reply. If there are hosts (in
241          * arpRequestors) waiting for the ARP reply for this sourceIP, it's
242          * time to generate the reply and send it to these hosts.
243          *
244          * If sourceIP==targetIP, it is a Gratuitous ARP. If there are hosts (in
245          * arpRequestors) waiting for the ARP reply for this sourceIP, it's time
246          * to generate the reply and send it to these hosts
247          */
248
249         if (pkt.getOpCode() != ARP.REQUEST || sourceIP.equals(targetIP)) {
250             // Raise a reply event so that any waiting requestors will be sent a reply
251             // the true value indicates we should generate replies to requestors across the cluster
252             log.trace("Received ARP reply packet from {}, reply to all requestors.", sourceIP);
253             arpRequestReplyEvent.put(new ARPReply(sourceIP, sourceMAC), true);
254             return;
255         }
256
257         /*
258          * ARP Request Handling:
259          * If targetIP is the IP of the subnet, reply with ARP REPLY
260          * If targetIP is a known host, PROXY ARP (by sending ARP REPLY) on behalf of known target hosts.
261          * For unknown target hosts, generate and send an ARP request to ALL switches/ports using
262          * the IP address defined in the subnet as source address
263          */
264         /*
265          * If target IP is gateway IP, Send ARP reply
266          */
267         if ((targetIP.equals(subnet.getNetworkAddress()))
268                 && (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(targetMAC, getControllerMAC()))) {
269             if (connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL){
270                 if (log.isTraceEnabled()){
271                     log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}",
272                             HexEncode.bytesToHexString(getControllerMAC()));
273                 }
274                 sendARPReply(p, getControllerMAC(), targetIP, pkt.getSenderHardwareAddress(), sourceIP);
275             } else {
276                 log.trace("Received non-local ARP req. for default gateway. Raising reply event");
277                 arpRequestReplyEvent.put(
278                         new ARPReply(p, targetIP, getControllerMAC(), sourceIP, pkt.getSenderHardwareAddress()), false);
279             }
280             return;
281         }
282
283
284         HostNodeConnector host = hostTracker.hostQuery(targetIP);
285         // unknown host, initiate ARP request
286         if (host == null) {
287             // add the requestor to the list so that we can replay the reply
288             // when the host responds
289             if (requestor != null) {
290                 Set<HostNodeConnector> requestorSet = arpRequestors.get(targetIP);
291                 if (requestorSet == null) {
292                     requestorSet = Collections.newSetFromMap(new ConcurrentHashMap<HostNodeConnector, Boolean>());
293                     arpRequestors.put(targetIP, requestorSet);
294                 }
295                 requestorSet.add(requestor);
296                 countDownTimers.put(targetIP, (short) 2); // reset timeout to 2sec
297             }
298             //Raise a bcast request event, all controllers need to send one
299             log.trace("Sending a bcast ARP request for {}", targetIP);
300             arpRequestReplyEvent.put(new ARPRequest(targetIP, subnet), false);
301         } else {
302             /*
303              * Target host known (across the cluster), send ARP REPLY make sure that targetMAC
304              * matches the host's MAC if it is not broadcastMAC
305              */
306             if (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(host.getDataLayerAddressBytes(), targetMAC)) {
307                 log.trace("Received ARP req. for known host {}, sending reply...", targetIP);
308                 if (connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL) {
309                     sendARPReply(p,
310                             host.getDataLayerAddressBytes(),
311                             host.getNetworkAddress(),
312                             pkt.getSenderHardwareAddress(),
313                             sourceIP);
314                 } else {
315                     arpRequestReplyEvent.put(new ARPReply(
316                             p,
317                             host.getNetworkAddress(),
318                             host.getDataLayerAddressBytes(),
319                             sourceIP,
320                             pkt.getSenderHardwareAddress()), false);
321                 }
322             } else {
323                 /*
324                  * Target MAC has been changed. For now, discard it.
325                  * TODO: We may need to send unicast ARP REQUEST on behalf of
326                  * the target back to the sender to trigger the sender to update
327                  * its table
328                  */
329             }
330         }
331     }
332
333     /**
334      *  Send a broadcast ARP Request to the switch/ ports  using
335      *  the networkAddress of the subnet as sender IP
336      *  the controller's MAC as sender MAC
337      *  the targetIP as the target Network Address
338      */
339     protected void sendBcastARPRequest(InetAddress targetIP, Subnet subnet) {
340         log.trace("sendBcatARPRequest targetIP:{} subnet:{}", targetIP, subnet);
341         Set<NodeConnector> nodeConnectors;
342         if (subnet.isFlatLayer2()) {
343             nodeConnectors = new HashSet<NodeConnector>();
344             for (Node n : this.switchManager.getNodes()) {
345                 nodeConnectors.addAll(this.switchManager.getUpNodeConnectors(n));
346             }
347         } else {
348             nodeConnectors = subnet.getNodeConnectors();
349         }
350         byte[] targetHardwareAddress = new byte[] { (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 };
351
352         //TODO: should use IBroadcastHandler instead
353         for (NodeConnector p : nodeConnectors) {
354             //filter out any non-local or internal ports
355             if (!(connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL) || topologyManager.isInternal(p)) {
356                 continue;
357             }
358             log.trace("Sending toward nodeConnector:{}", p);
359             byte[] senderIP = subnet.getNetworkAddress().getAddress();
360             byte[] targetIPByte = targetIP.getAddress();
361             ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetHardwareAddress, targetIPByte);
362
363             byte[] destMACAddress = NetUtils.getBroadcastMACAddr();
364             Ethernet ethernet = createEthernet(getControllerMAC(), destMACAddress, arp);
365
366             // TODO For now send port-by-port, see how to optimize to
367             // send to multiple ports at once
368             RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
369             destPkt.setOutgoingNodeConnector(p);
370
371             this.dataPacketService.transmitDataPacket(destPkt);
372         }
373     }
374
375     /**
376      * Send a unicast ARP Request to the known host on a specific switch/port as
377      * defined in the host.
378      * The sender IP is the networkAddress of the subnet
379      * The sender MAC is the controller's MAC
380      */
381     protected void sendUcastARPRequest(HostNodeConnector host, Subnet subnet) {
382         log.trace("sendUcastARPRequest host:{} subnet:{}", host, subnet);
383         NodeConnector outPort = host.getnodeConnector();
384         if (outPort == null) {
385             log.error("Failed sending UcastARP because cannot extract output port from Host: {}", host);
386             return;
387         }
388
389         byte[] senderIP = subnet.getNetworkAddress().getAddress();
390         byte[] targetIP = host.getNetworkAddress().getAddress();
391         byte[] targetMAC = host.getDataLayerAddressBytes();
392         ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetMAC, targetIP);
393
394         Ethernet ethernet = createEthernet(getControllerMAC(), targetMAC, arp);
395
396         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
397         destPkt.setOutgoingNodeConnector(outPort);
398
399         this.dataPacketService.transmitDataPacket(destPkt);
400     }
401
402     @Override
403     public void find(InetAddress networkAddress) {
404         log.trace("Received find IP {}", networkAddress);
405
406         Subnet subnet = null;
407         if (switchManager != null) {
408             subnet = switchManager.getSubnetByNetworkAddress(networkAddress);
409         }
410         if (subnet == null) {
411             log.debug("Can't find subnet matching IP {}", networkAddress);
412             return;
413         }
414
415         // send a broadcast ARP Request to this IP
416         arpRequestReplyEvent.put(new ARPRequest(networkAddress, subnet), false);
417     }
418
419     /*
420      * Probe the host by sending a unicast ARP Request to the host
421      */
422     @Override
423     public void probe(HostNodeConnector host) {
424         log.trace("Received probe host {}", host);
425
426         Subnet subnet = null;
427         if (switchManager != null) {
428             subnet = switchManager.getSubnetByNetworkAddress(host
429                     .getNetworkAddress());
430         }
431         if (subnet == null) {
432             log.debug("can't find subnet matching {}", host.getNetworkAddress());
433             return;
434         }
435
436         if (connectionManager.getLocalityStatus(host.getnodeconnectorNode()) == ConnectionLocality.LOCAL){
437             log.trace("Send a ucast ARP req. to: {}", host);
438             sendUcastARPRequest(host, subnet);
439         } else {
440             log.trace("Raise a ucast ARP req. event to: {}", host);
441             arpRequestReplyEvent.put(new ARPRequest(host, subnet), false);
442         }
443     }
444
445     /**
446      * An IP packet is punted to the controller, this means that the
447      * destination host is not known to the controller.
448      * Need to discover it by sending a Broadcast ARP Request
449      *
450      * @param pkt
451      * @param p
452      */
453     protected void handlePuntedIPPacket(IPv4 pkt, NodeConnector p) {
454
455         InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress());
456         if (dIP == null) {
457            return;
458         }
459
460         // try to find a matching subnet
461         Subnet subnet = null;
462         if (switchManager != null) {
463             subnet = switchManager.getSubnetByNetworkAddress(dIP);
464         }
465         if (subnet == null) {
466             log.debug("Can't find subnet matching {}, drop packet", dIP);
467             return;
468         }
469
470         // see if we know about the host
471         HostNodeConnector host = hostTracker.hostFind(dIP);
472
473         if (host == null) {
474             // if we don't, know about the host, try to find it
475             log.trace("Punted IP pkt to {}, sending bcast ARP event...",
476                       dIP);
477             /*
478              * unknown destination host, initiate bcast ARP request
479              */
480             arpRequestReplyEvent.put(new ARPRequest(dIP, subnet), false);
481
482         } else if (routing == null ||
483                    routing.getRoute(p.getNode(), host.getnodeconnectorNode()) != null) {
484             /* if IRouting is available, make sure that this packet can get it's
485              * destination normally before teleporting it there. If it's not
486              * available, then assume it's reachable.
487              *
488              * TODO: come up with a way to do this in the absence of IRouting
489              */
490
491             log.trace("forwarding punted IP pkt to {} received at {}", dIP, p);
492
493             /* if we know where the host is and there's a path from where this
494              * packet was punted to where the host is, then deliver it to the
495              * host for now */
496             NodeConnector nc = host.getnodeConnector();
497
498             // re-encode the Ethernet packet (the parent of the IPv4 packet)
499             RawPacket rp = this.dataPacketService.encodeDataPacket(pkt.getParent());
500             rp.setOutgoingNodeConnector(nc);
501             this.dataPacketService.transmitDataPacket(rp);
502         } else {
503             log.trace("ignoring punted IP pkt to {} because there is no route from {}",
504                       dIP, p);
505         }
506     }
507
508     public byte[] getControllerMAC() {
509         if (switchManager == null) {
510             return null;
511         }
512         return switchManager.getControllerMAC();
513     }
514
515     /**
516      * Function called by the dependency manager when all the required
517      * dependencies are satisfied
518      *
519      */
520     void init() {
521         arpRequestors = new ConcurrentHashMap<InetAddress, Set<HostNodeConnector>>();
522         countDownTimers = new ConcurrentHashMap<InetAddress, Short>();
523         cacheEventHandler = new Thread(new ARPCacheEventHandler(), "ARPCacheEventHandler Thread");
524
525         allocateCaches();
526         retrieveCaches();
527     }
528
529     @SuppressWarnings({ "unchecked" })
530     private void retrieveCaches() {
531         ConcurrentMap<?,?> map;
532
533         if (this.clusterContainerService == null){
534             log.error("Cluster service unavailable, can't retieve ARPHandler caches!");
535             return;
536         }
537
538         map = clusterContainerService.getCache(ARP_EVENT_CACHE_NAME);
539         if (map != null){
540             this.arpRequestReplyEvent = (ConcurrentMap<ARPEvent, Boolean>) map;
541         } else {
542             log.error("Cache allocation failed for {}", ARP_EVENT_CACHE_NAME);
543         }
544     }
545
546     private void allocateCaches() {
547         if (clusterContainerService == null){
548             nonClusterObjectCreate();
549             log.error("Clustering service unavailable. Allocated non-cluster caches for ARPHandler.");
550             return;
551         }
552
553         try{
554             clusterContainerService.createCache(ARP_EVENT_CACHE_NAME,
555                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
556         } catch (CacheConfigException e){
557             log.error("ARPHandler cache configuration invalid!");
558         } catch (CacheExistException e){
559             log.debug("ARPHandler cache exists, skipped allocation.");
560         }
561
562     }
563
564     private void nonClusterObjectCreate(){
565         arpRequestReplyEvent = new ConcurrentHashMap<ARPEvent, Boolean>();
566     }
567     /**
568      * Function called by the dependency manager when at least one
569      * dependency become unsatisfied or when the component is shutting
570      * down because for example bundle is being stopped.
571      *
572      */
573     void destroy() {
574         cacheEventHandler.interrupt();
575     }
576
577     /**
578      * Function called by dependency manager after "init ()" is called
579      * and after the services provided by the class are registered in
580      * the service registry
581      *
582      */
583     void start() {
584         stopping = false;
585         startPeriodicTimer();
586         cacheEventHandler.start();
587     }
588
589     /**
590      * Function called by the dependency manager before the services
591      * exported by the component are unregistered, this will be
592      * followed by a "destroy ()" calls
593      *
594      */
595     void stop(){
596     }
597
598     void stopping() {
599         stopping = true;
600         cancelPeriodicTimer();
601     }
602
603     void setSwitchManager(ISwitchManager s) {
604         log.debug("SwitchManager service set.");
605         this.switchManager = s;
606     }
607
608     void unsetSwitchManager(ISwitchManager s) {
609         if (this.switchManager == s) {
610             log.debug("SwitchManager service UNset.");
611             this.switchManager = null;
612         }
613     }
614
615     @Override
616     public PacketResult receiveDataPacket(RawPacket inPkt) {
617         if (inPkt == null) {
618             return PacketResult.IGNORED;
619         }
620         log.trace("Received a frame of size: {}", inPkt.getPacketData().length);
621         Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
622         if (formattedPak instanceof Ethernet) {
623             Object nextPak = formattedPak.getPayload();
624             if (nextPak instanceof IPv4) {
625                 log.trace("Handle IP packet: {}", formattedPak);
626                 handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector());
627             } else if (nextPak instanceof ARP) {
628                 log.trace("Handle ARP packet: {}", formattedPak);
629                 handleARPPacket((Ethernet) formattedPak, (ARP) nextPak, inPkt
630                         .getIncomingNodeConnector());
631             }
632         }
633         return PacketResult.IGNORED;
634     }
635
636     private ARP createARP(short opCode, byte[] senderMacAddress, byte[] senderIP, byte[] targetMacAddress, byte[] targetIP) {
637             ARP arp = new ARP();
638             arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
639             arp.setProtocolType(EtherTypes.IPv4.shortValue());
640             arp.setHardwareAddressLength((byte) 6);
641             arp.setProtocolAddressLength((byte) 4);
642             arp.setOpCode(opCode);
643             arp.setSenderHardwareAddress(senderMacAddress) ;
644             arp.setSenderProtocolAddress(senderIP);
645             arp.setTargetHardwareAddress(targetMacAddress);
646             arp.setTargetProtocolAddress(targetIP);
647             return arp;
648     }
649
650     private Ethernet createEthernet(byte[] sourceMAC, byte[] targetMAC, ARP arp) {
651         Ethernet ethernet = new Ethernet();
652         ethernet.setSourceMACAddress(sourceMAC);
653         ethernet.setDestinationMACAddress(targetMAC);
654         ethernet.setEtherType(EtherTypes.ARP.shortValue());
655         ethernet.setPayload(arp);
656         return ethernet;
657     }
658
659     private void startPeriodicTimer() {
660         this.periodicTimer = new Timer("ArpHandler Periodic Timer");
661         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
662             @Override
663             public void run() {
664                 Set<InetAddress> targetIPs = countDownTimers.keySet();
665                 Set<InetAddress> expiredTargets = new HashSet<InetAddress>();
666                 for (InetAddress t : targetIPs) {
667                     short tick = countDownTimers.get(t);
668                     tick--;
669                     if (tick <= 0) {
670                         expiredTargets.add(t);
671                     } else {
672                         countDownTimers.replace(t, tick);
673                     }
674                 }
675                 for (InetAddress tIP : expiredTargets) {
676                     countDownTimers.remove(tIP);
677                     // Remove the requestor(s) who have been waiting for the ARP
678                     // reply from this target for more than 1sec
679                     arpRequestors.remove(tIP);
680                     log.debug("ARP reply was not received from {}", tIP);
681                 }
682
683                 // Clean up ARP event cache
684                 try {
685                     if (clusterContainerService.amICoordinator() && ! arpRequestReplyEvent.isEmpty()){
686                         arpRequestReplyEvent.clear();
687                     }
688                 } catch (Exception e){
689                     log.warn("ARPHandler: A cluster member failed to clear event cache.");
690                 }
691             }
692         }, 0, 1000);
693     }
694
695     private void cancelPeriodicTimer() {
696         if (this.periodicTimer != null) {
697             this.periodicTimer.cancel();
698         }
699     }
700
701     private void generateAndSendReply(InetAddress sourceIP, byte[] sourceMAC) {
702         if (log.isTraceEnabled()) {
703             log.trace("generateAndSendReply called with params sourceIP:{} sourceMAC:{}", sourceIP,
704                       HexEncode.bytesToHexString(sourceMAC));
705         }
706         Set<HostNodeConnector> hosts = arpRequestors.remove(sourceIP);
707         if ((hosts == null) || hosts.isEmpty()) {
708             log.trace("Bailing out no requestors Hosts");
709             return;
710         }
711         countDownTimers.remove(sourceIP);
712         for (HostNodeConnector host : hosts) {
713             if (log.isTraceEnabled()) {
714                 log.trace("Sending ARP Reply with src {}/{}, target {}/{}",
715                           new Object[] {
716                               HexEncode.bytesToHexString(sourceMAC),
717                               sourceIP,
718                               HexEncode.bytesToHexString(host.getDataLayerAddressBytes()),
719                               host.getNetworkAddress() });
720             }
721             if (connectionManager.getLocalityStatus(host.getnodeconnectorNode()) == ConnectionLocality.LOCAL){
722                 sendARPReply(host.getnodeConnector(),
723                         sourceMAC,
724                         sourceIP,
725                         host.getDataLayerAddressBytes(),
726                         host.getNetworkAddress());
727             } else {
728                 /*
729                  * In the remote event a requestor moved to another
730                  * controller it may turn out it now we need to send
731                  * the ARP reply from a different controller, this
732                  * cover the case
733                  */
734                 arpRequestReplyEvent.put(
735                         new ARPReply(
736                             host.getnodeConnector(),
737                             sourceIP,
738                             sourceMAC,
739                             host.getNetworkAddress(),
740                             host.getDataLayerAddressBytes()), false);
741             }
742         }
743     }
744
745
746     @Override
747     public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
748         log.trace("Got and entryUpdated for cacheName {} key {} isNew {}", cacheName, key, new_value);
749         enqueueARPCacheEvent(key, new_value);
750     }
751
752     @Override
753     public void entryCreated(ARPEvent key, String cacheName, boolean originLocal) {
754         // nothing to do
755     }
756     @Override
757     public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) {
758         // nothing to do
759     }
760
761     private void enqueueARPCacheEvent (ARPEvent event, boolean new_value) {
762         try {
763             ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
764             if (!ARPCacheEvents.contains(cacheEvent)) {
765                 this.ARPCacheEvents.add(cacheEvent);
766                 log.trace("Enqueued {}", event);
767             }
768         } catch (Exception e) {
769             log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
770         }
771     }
772
773     /*
774      * this thread monitors the connectionEvent queue for new incoming events from
775      */
776     private class ARPCacheEventHandler implements Runnable {
777         @Override
778         public void run() {
779             while (!stopping) {
780                 try {
781                     ARPCacheEvent ev = ARPCacheEvents.take();
782                     ARPEvent event = ev.getEvent();
783                     if (event instanceof ARPRequest) {
784                         ARPRequest req = (ARPRequest) event;
785                         // If broadcast request
786                         if (req.getHost() == null) {
787                             log.trace("Trigger and ARP Broadcast Request upon receipt of {}", req);
788                             sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
789
790                         //If unicast and local, send reply
791                         } else if (connectionManager.getLocalityStatus(req.getHost().getnodeconnectorNode()) == ConnectionLocality.LOCAL) {
792                             log.trace("ARPCacheEventHandler - sendUcatARPRequest upon receipt of {}", req);
793                             sendUcastARPRequest(req.getHost(), req.getSubnet());
794                         }
795                     } else if (event instanceof ARPReply) {
796                         ARPReply rep = (ARPReply) event;
797                         // New reply received by controller, notify all awaiting requestors across the cluster
798                         if (ev.isNewReply()) {
799                             log.trace("Trigger a generateAndSendReply in response to {}", rep);
800                             generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
801                         // Otherwise, a specific reply. If local, send out.
802                         } else if (connectionManager.getLocalityStatus(rep.getPort().getNode()) == ConnectionLocality.LOCAL) {
803                             log.trace("ARPCacheEventHandler - sendUcatARPReply locally in response to {}", rep);
804                             sendARPReply(rep.getPort(),
805                                     rep.getSourceMac(),
806                                     rep.getSourceIP(),
807                                     rep.getTargetMac(),
808                                     rep.getTargetIP());
809                         }
810                     }
811                 } catch (InterruptedException e) {
812                     ARPCacheEvents.clear();
813                     return;
814                 }
815             }
816         }
817     }
818 }