9250861b028b360836ac47328fd4aa0f4330c082
[controller.git] / opendaylight / arphandler / src / main / java / org / opendaylight / controller / arphandler / internal / ArpHandler.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /*
11  *
12  */
13 package org.opendaylight.controller.arphandler.internal;
14
15 import java.net.InetAddress;
16 import java.net.UnknownHostException;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.EnumSet;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.opendaylight.controller.arphandler.ARPCacheEvent;
31 import org.opendaylight.controller.arphandler.ARPEvent;
32 import org.opendaylight.controller.arphandler.ARPReply;
33 import org.opendaylight.controller.arphandler.ARPRequest;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.connectionmanager.IConnectionManager;
40 import org.opendaylight.controller.hosttracker.IfHostListener;
41 import org.opendaylight.controller.hosttracker.IfIptoHost;
42 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
43 import org.opendaylight.controller.hosttracker.hostAware.IHostFinder;
44 import org.opendaylight.controller.sal.core.ConstructionException;
45 import org.opendaylight.controller.sal.core.Node;
46 import org.opendaylight.controller.sal.core.NodeConnector;
47 import org.opendaylight.controller.sal.packet.ARP;
48 import org.opendaylight.controller.sal.packet.Ethernet;
49 import org.opendaylight.controller.sal.packet.IDataPacketService;
50 import org.opendaylight.controller.sal.packet.IListenDataPacket;
51 import org.opendaylight.controller.sal.packet.IPv4;
52 import org.opendaylight.controller.sal.packet.Packet;
53 import org.opendaylight.controller.sal.packet.PacketResult;
54 import org.opendaylight.controller.sal.packet.RawPacket;
55 import org.opendaylight.controller.sal.utils.EtherTypes;
56 import org.opendaylight.controller.sal.utils.HexEncode;
57 import org.opendaylight.controller.sal.utils.NetUtils;
58 import org.opendaylight.controller.switchmanager.ISwitchManager;
59 import org.opendaylight.controller.switchmanager.Subnet;
60 import org.opendaylight.controller.topologymanager.ITopologyManager;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateAware<ARPEvent, Boolean> {
65     private static final Logger log = LoggerFactory.getLogger(ArpHandler.class);
66     static final String ARP_EVENT_CACHE_NAME = "arphandler.arpRequestReplyEvent";
67     private IfIptoHost hostTracker;
68     private ISwitchManager switchManager;
69     private ITopologyManager topologyManager;
70     private IDataPacketService dataPacketService;
71     private IClusterContainerServices clusterContainerService;
72     private IConnectionManager connectionManager;
73     private Set<IfHostListener> hostListeners = new CopyOnWriteArraySet<IfHostListener>();
74     private ConcurrentMap<InetAddress, Set<HostNodeConnector>> arpRequestors;
75     private ConcurrentMap<InetAddress, Short> countDownTimers;
76     private Timer periodicTimer;
77     private BlockingQueue<ARPCacheEvent> ARPCacheEvents = new LinkedBlockingQueue<ARPCacheEvent>();
78     private Thread cacheEventHandler;
79     private boolean stopping = false;
80     /*
81      * A cluster allocated cache. Used for synchronizing ARP request/reply
82      * events across all cluster controllers. To raise an event, we put() a specific
83      * event object (as key) and all nodes handle it in the entryUpdated callback.
84      *
85      * In case of ARPReply, we put true value to send replies to any requestors
86      * by calling generateAndSendReply
87      */
88     private ConcurrentMap<ARPEvent, Boolean> arpRequestReplyEvent;
89
90     void setConnectionManager(IConnectionManager cm){
91         this.connectionManager = cm;
92     }
93
94     void unsetConnectionManager(IConnectionManager cm){
95         if (this.connectionManager == cm){
96             connectionManager = null;
97         }
98     }
99
100     void setClusterContainerService(IClusterContainerServices s){
101         this.clusterContainerService = s;
102     }
103
104     void unsetClusterContainerService(IClusterContainerServices s) {
105         if (this.clusterContainerService == s) {
106             this.clusterContainerService = null;
107         }
108     }
109
110     void setHostListener(IfHostListener s) {
111         if (this.hostListeners != null) {
112             this.hostListeners.add(s);
113         }
114     }
115
116     void unsetHostListener(IfHostListener s) {
117         if (this.hostListeners != null) {
118             this.hostListeners.remove(s);
119         }
120     }
121
122     void setDataPacketService(IDataPacketService s) {
123         this.dataPacketService = s;
124     }
125
126     void unsetDataPacketService(IDataPacketService s) {
127         if (this.dataPacketService == s) {
128             this.dataPacketService = null;
129         }
130     }
131
132     public void setHostTracker(IfIptoHost hostTracker) {
133         log.debug("Setting HostTracker");
134         this.hostTracker = hostTracker;
135     }
136
137     public void unsetHostTracker(IfIptoHost s) {
138         log.debug("UNSetting HostTracker");
139         if (this.hostTracker == s) {
140             this.hostTracker = null;
141         }
142     }
143
144     public void setTopologyManager(ITopologyManager tm) {
145         this.topologyManager = tm;
146     }
147
148     public void unsetTopologyManager(ITopologyManager tm) {
149         if (this.topologyManager == tm) {
150             this.topologyManager = null;
151         }
152     }
153
154     protected void sendARPReply(NodeConnector p, byte[] sMAC, InetAddress sIP,
155             byte[] tMAC, InetAddress tIP) {
156         byte[] senderIP = sIP.getAddress();
157         byte[] targetIP = tIP.getAddress();
158         ARP arp = createARP(ARP.REPLY,sMAC,senderIP,tMAC,targetIP);
159
160         Ethernet ethernet = createEthernet(sMAC, tMAC, arp);
161
162         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
163         destPkt.setOutgoingNodeConnector(p);
164
165         this.dataPacketService.transmitDataPacket(destPkt);
166     }
167
168     protected void handleARPPacket(Ethernet eHeader, ARP pkt, NodeConnector p) {
169
170         byte[] sourceMAC = eHeader.getSourceMACAddress();
171         byte[] targetMAC = eHeader.getDestinationMACAddress();
172         /*
173          * Sanity Check; drop ARP packets originated by the controller itself.
174          * This is to avoid continuous flooding
175          */
176         if (Arrays.equals(sourceMAC, getControllerMAC())) {
177             if (log.isDebugEnabled()) {
178                 log.debug("Receive a self originated ARP pkt (srcMAC {}) --> DROP",
179                         HexEncode.bytesToHexString(sourceMAC));
180             }
181             return;
182         }
183
184         InetAddress targetIP, sourceIP;
185         try {
186             targetIP = InetAddress.getByAddress(pkt.getTargetProtocolAddress());
187             sourceIP = InetAddress.getByAddress(pkt.getSenderProtocolAddress());
188         } catch (UnknownHostException e1) {
189             log.debug("Invalid host in ARP packet: {}", e1.getMessage());
190             return;
191         }
192
193         Subnet subnet = null;
194         if (switchManager != null) {
195             subnet = switchManager.getSubnetByNetworkAddress(sourceIP);
196         }
197         if (subnet == null) {
198             log.debug("ARPHandler: can't find subnet matching {}, drop packet", sourceIP);
199             return;
200         }
201
202         // Make sure that the host is a legitimate member of this subnet
203         if (!subnet.hasNodeConnector(p)) {
204             log.debug("{} showing up on {} does not belong to {}",
205                     new Object[] { sourceIP, p, subnet });
206             return;
207         }
208
209         HostNodeConnector requestor = null;
210         if (NetUtils.isUnicastMACAddr(sourceMAC) && p.getNode() != null) {
211             try {
212                 requestor = new HostNodeConnector(sourceMAC, sourceIP, p, subnet.getVlan());
213             } catch (ConstructionException e) {
214                 log.debug("Received ARP packet with invalid MAC: {}", HexEncode.bytesToHexString(sourceMAC));
215                 return;
216             }
217             /*
218              * Learn host from the received ARP REQ/REPLY, inform Host Tracker
219              */
220             log.trace("Inform Host tracker of new host {}", requestor.getNetworkAddress());
221             for (IfHostListener listener : this.hostListeners) {
222                 listener.hostListener(requestor);
223             }
224         }
225
226         /*
227          * OpCode != request -> ARP Reply. If there are hosts (in
228          * arpRequestors) waiting for the ARP reply for this sourceIP, it's
229          * time to generate the reply and send it to these hosts.
230          *
231          * If sourceIP==targetIP, it is a Gratuitous ARP. If there are hosts (in
232          * arpRequestors) waiting for the ARP reply for this sourceIP, it's time
233          * to generate the reply and send it to these hosts
234          */
235
236         if (pkt.getOpCode() != ARP.REQUEST || sourceIP.equals(targetIP)) {
237             // Raise a reply event so that any waiting requestors will be sent a reply
238             // the true value indicates we should generate replies to requestors across the cluster
239             log.trace("Received ARP reply packet from {}, reply to all requestors.", sourceIP);
240             arpRequestReplyEvent.put(new ARPReply(sourceIP, sourceMAC), true);
241             return;
242         }
243
244         /*
245          * ARP Request Handling:
246          * If targetIP is the IP of the subnet, reply with ARP REPLY
247          * If targetIP is a known host, PROXY ARP (by sending ARP REPLY) on behalf of known target hosts.
248          * For unknown target hosts, generate and send an ARP request to ALL switches/ports using
249          * the IP address defined in the subnet as source address
250          */
251         /*
252          * If target IP is gateway IP, Send ARP reply
253          */
254         if ((targetIP.equals(subnet.getNetworkAddress()))
255                 && (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(targetMAC, getControllerMAC()))) {
256             if (connectionManager.isLocal(p.getNode())){
257                 if (log.isTraceEnabled()){
258                     log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}",
259                             HexEncode.bytesToHexString(getControllerMAC()));
260                 }
261                 sendARPReply(p, getControllerMAC(), targetIP, pkt.getSenderHardwareAddress(), sourceIP);
262             } else {
263                 log.trace("Received non-local ARP req. for default gateway. Raising reply event");
264                 arpRequestReplyEvent.put(
265                         new ARPReply(p, targetIP, getControllerMAC(), sourceIP, pkt.getSenderHardwareAddress()), false);
266             }
267             return;
268         }
269
270
271         HostNodeConnector host = hostTracker.hostQuery(targetIP);
272         // unknown host, initiate ARP request
273         if (host == null) {
274             // add the requestor to the list so that we can replay the reply
275             // when the host responds
276             if (requestor != null) {
277                 Set<HostNodeConnector> requestorSet = arpRequestors.get(targetIP);
278                 if (requestorSet == null) {
279                     requestorSet = Collections.newSetFromMap(new ConcurrentHashMap<HostNodeConnector, Boolean>());
280                     arpRequestors.put(targetIP, requestorSet);
281                 }
282                 requestorSet.add(requestor);
283                 countDownTimers.put(targetIP, (short) 2); // reset timeout to 2sec
284             }
285             //Raise a bcast request event, all controllers need to send one
286             log.trace("Sending a bcast ARP request for {}", targetIP);
287             arpRequestReplyEvent.put(new ARPRequest(targetIP, subnet), false);
288         } else {
289             /*
290              * Target host known (across the cluster), send ARP REPLY make sure that targetMAC
291              * matches the host's MAC if it is not broadcastMAC
292              */
293             if (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(host.getDataLayerAddressBytes(), targetMAC)) {
294                 log.trace("Received ARP req. for known host {}, sending reply...", targetIP);
295                 if (connectionManager.isLocal(p.getNode())) {
296                     sendARPReply(p,
297                             host.getDataLayerAddressBytes(),
298                             host.getNetworkAddress(),
299                             pkt.getSenderHardwareAddress(),
300                             sourceIP);
301                 } else {
302                     arpRequestReplyEvent.put(new ARPReply(
303                             p,
304                             host.getNetworkAddress(),
305                             host.getDataLayerAddressBytes(),
306                             sourceIP,
307                             pkt.getSenderHardwareAddress()), false);
308                 }
309             } else {
310                 /*
311                  * Target MAC has been changed. For now, discard it.
312                  * TODO: We may need to send unicast ARP REQUEST on behalf of
313                  * the target back to the sender to trigger the sender to update
314                  * its table
315                  */
316             }
317         }
318     }
319
320     /*
321      *  Send a broadcast ARP Request to the switch/ ports  using
322      *  the networkAddress of the subnet as sender IP
323      *  the controller's MAC as sender MAC
324      *  the targetIP as the target Network Address
325      */
326     protected void sendBcastARPRequest(InetAddress targetIP, Subnet subnet) {
327         log.trace("sendBcatARPRequest targetIP:{} subnet:{}", targetIP, subnet);
328         Set<NodeConnector> nodeConnectors;
329         if (subnet.isFlatLayer2()) {
330             nodeConnectors = new HashSet<NodeConnector>();
331             for (Node n : this.switchManager.getNodes()) {
332                 nodeConnectors.addAll(this.switchManager.getUpNodeConnectors(n));
333             }
334         } else {
335             nodeConnectors = subnet.getNodeConnectors();
336         }
337         byte[] targetHardwareAddress = new byte[] { (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 };
338         for (NodeConnector p : nodeConnectors) {
339             //fiter out any non-local or internal ports
340             if (! connectionManager.isLocal(p.getNode()) || topologyManager.isInternal(p)) {
341                 continue;
342             }
343             log.trace("Sending toward nodeConnector:{}", p);
344             byte[] senderIP = subnet.getNetworkAddress().getAddress();
345             byte[] targetIPByte = targetIP.getAddress();
346             ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetHardwareAddress, targetIPByte);
347
348             byte[] destMACAddress = new byte[] {(byte) -1, (byte) -1, (byte) -1, (byte) -1,(byte) -1,(byte) -1 };
349             Ethernet ethernet = createEthernet(getControllerMAC(), destMACAddress, arp);
350
351             // TODO For now send port-by-port, see how to optimize to
352             // send to multiple ports at once
353             RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
354             destPkt.setOutgoingNodeConnector(p);
355
356             this.dataPacketService.transmitDataPacket(destPkt);
357         }
358     }
359
360     /*
361      * Send a unicast ARP Request to the known host on a specific switch/port as
362      * defined in the host.
363      * The sender IP is the networkAddress of the subnet
364      * The sender MAC is the controller's MAC
365      */
366     protected void sendUcastARPRequest(HostNodeConnector host, Subnet subnet) {
367         log.trace("sendUcastARPRequest host:{} subnet:{}", host, subnet);
368         NodeConnector outPort = host.getnodeConnector();
369         if (outPort == null) {
370             log.error("Failed sending UcastARP because cannot extract output port from Host: {}", host);
371             return;
372         }
373
374         byte[] senderIP = subnet.getNetworkAddress().getAddress();
375         byte[] targetIP = host.getNetworkAddress().getAddress();
376         byte[] targetMAC = host.getDataLayerAddressBytes();
377         ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetMAC, targetIP);
378
379         Ethernet ethernet = createEthernet(getControllerMAC(), targetMAC, arp);
380
381         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
382         destPkt.setOutgoingNodeConnector(outPort);
383
384         this.dataPacketService.transmitDataPacket(destPkt);
385     }
386
387     @Override
388     public void find(InetAddress networkAddress) {
389         log.trace("Received find IP {}", networkAddress);
390
391         Subnet subnet = null;
392         if (switchManager != null) {
393             subnet = switchManager.getSubnetByNetworkAddress(networkAddress);
394         }
395         if (subnet == null) {
396             log.debug("Can't find subnet matching IP {}", networkAddress);
397             return;
398         }
399
400         // send a broadcast ARP Request to this IP
401         arpRequestReplyEvent.put(new ARPRequest(networkAddress, subnet), false);
402     }
403
404     /*
405      * Probe the host by sending a unicast ARP Request to the host
406      */
407     @Override
408     public void probe(HostNodeConnector host) {
409         log.trace("Received probe host {}", host);
410
411         Subnet subnet = null;
412         if (switchManager != null) {
413             subnet = switchManager.getSubnetByNetworkAddress(host
414                     .getNetworkAddress());
415         }
416         if (subnet == null) {
417             log.debug("can't find subnet matching {}", host.getNetworkAddress());
418             return;
419         }
420
421         if (connectionManager.isLocal(host.getnodeconnectorNode())){
422             log.trace("Send a ucast ARP req. to: {}", host);
423             sendUcastARPRequest(host, subnet);
424         } else {
425             log.trace("Raise a ucast ARP req. event to: {}", host);
426             arpRequestReplyEvent.put(new ARPRequest(host, subnet), false);
427         }
428     }
429
430     /*
431      * An IP packet is punted to the controller, this means that the
432      * destination host is not known to the controller.
433      * Need to discover it by sending a Broadcast ARP Request
434      */
435     protected void handlePuntedIPPacket(IPv4 pkt, NodeConnector p) {
436
437         InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress());
438         if (dIP == null) {
439            return;
440         }
441
442         Subnet subnet = null;
443         if (switchManager != null) {
444             subnet = switchManager.getSubnetByNetworkAddress(dIP);
445         }
446         if (subnet == null) {
447             log.debug("Can't find subnet matching {}, drop packet", dIP);
448             return;
449         }
450         log.trace("Punted IP pkt from {}, sending bcast ARP event...", dIP);
451         /*
452          * unknown destination host, initiate bcast ARP request
453          */
454         arpRequestReplyEvent.put(new ARPRequest(dIP, subnet), false);
455         return;
456     }
457
458     public byte[] getControllerMAC() {
459         if (switchManager == null) {
460             return null;
461         }
462         return switchManager.getControllerMAC();
463     }
464
465     /**
466      * Function called by the dependency manager when all the required
467      * dependencies are satisfied
468      *
469      */
470     void init() {
471         arpRequestors = new ConcurrentHashMap<InetAddress, Set<HostNodeConnector>>();
472         countDownTimers = new ConcurrentHashMap<InetAddress, Short>();
473         cacheEventHandler = new Thread(new ARPCacheEventHandler(), "ARPCacheEventHandler Thread");
474
475         allocateCaches();
476         retrieveCaches();
477     }
478
479     @SuppressWarnings({ "unchecked", "deprecation" })
480     private void retrieveCaches() {
481         ConcurrentMap<?,?> map;
482
483         if (this.clusterContainerService == null){
484             log.error("Cluster service unavailable, can't retieve ARPHandler caches!");
485             return;
486         }
487
488         map = clusterContainerService.getCache(ARP_EVENT_CACHE_NAME);
489         if (map != null){
490             this.arpRequestReplyEvent = (ConcurrentMap<ARPEvent, Boolean>) map;
491         } else {
492             log.error("Cache allocation failed for {}", ARP_EVENT_CACHE_NAME);
493         }
494     }
495
496     @SuppressWarnings("deprecation")
497     private void allocateCaches() {
498         if (clusterContainerService == null){
499             nonClusterObjectCreate();
500             log.error("Clustering service unavailable. Allocated non-cluster caches for ARPHandler.");
501             return;
502         }
503
504         try{
505             clusterContainerService.createCache(ARP_EVENT_CACHE_NAME,
506                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
507         } catch (CacheConfigException e){
508             log.error("ARPHandler cache configuration invalid!");
509         } catch (CacheExistException e){
510             log.debug("ARPHandler cache exists, skipped allocation.");
511         }
512
513     }
514
515     private void nonClusterObjectCreate(){
516         arpRequestReplyEvent = new ConcurrentHashMap<ARPEvent, Boolean>();
517     }
518     /**
519      * Function called by the dependency manager when at least one
520      * dependency become unsatisfied or when the component is shutting
521      * down because for example bundle is being stopped.
522      *
523      */
524     void destroy() {
525         cacheEventHandler.interrupt();
526     }
527
528     /**
529      * Function called by dependency manager after "init ()" is called
530      * and after the services provided by the class are registered in
531      * the service registry
532      *
533      */
534     void start() {
535         stopping = false;
536         startPeriodicTimer();
537         cacheEventHandler.start();
538     }
539
540     /**
541      * Function called by the dependency manager before the services
542      * exported by the component are unregistered, this will be
543      * followed by a "destroy ()" calls
544      *
545      */
546     void stop(){
547     }
548
549     void stopping() {
550         stopping = true;
551         cancelPeriodicTimer();
552     }
553
554     void setSwitchManager(ISwitchManager s) {
555         log.debug("SwitchManager service set.");
556         this.switchManager = s;
557     }
558
559     void unsetSwitchManager(ISwitchManager s) {
560         if (this.switchManager == s) {
561             log.debug("SwitchManager service UNset.");
562             this.switchManager = null;
563         }
564     }
565
566     @Override
567     public PacketResult receiveDataPacket(RawPacket inPkt) {
568         if (inPkt == null) {
569             return PacketResult.IGNORED;
570         }
571         log.trace("Received a frame of size: {}", inPkt.getPacketData().length);
572         Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
573         if (formattedPak instanceof Ethernet) {
574             Object nextPak = formattedPak.getPayload();
575             if (nextPak instanceof IPv4) {
576                 log.trace("Handle IP packet: {}", formattedPak);
577                 handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector());
578             } else if (nextPak instanceof ARP) {
579                 log.trace("Handle ARP packet: {}", formattedPak);
580                 handleARPPacket((Ethernet) formattedPak, (ARP) nextPak, inPkt
581                         .getIncomingNodeConnector());
582             }
583         }
584         return PacketResult.IGNORED;
585     }
586
587     private ARP createARP(short opCode, byte[] senderMacAddress, byte[] senderIP, byte[] targetMacAddress, byte[] targetIP) {
588             ARP arp = new ARP();
589             arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
590             arp.setProtocolType(EtherTypes.IPv4.shortValue());
591             arp.setHardwareAddressLength((byte) 6);
592             arp.setProtocolAddressLength((byte) 4);
593             arp.setOpCode(opCode);
594             arp.setSenderHardwareAddress(senderMacAddress) ;
595             arp.setSenderProtocolAddress(senderIP);
596             arp.setTargetHardwareAddress(targetMacAddress);
597             arp.setTargetProtocolAddress(targetIP);
598             return arp;
599     }
600
601     private Ethernet createEthernet(byte[] sourceMAC, byte[] targetMAC, ARP arp) {
602         Ethernet ethernet = new Ethernet();
603         ethernet.setSourceMACAddress(sourceMAC);
604         ethernet.setDestinationMACAddress(targetMAC);
605         ethernet.setEtherType(EtherTypes.ARP.shortValue());
606         ethernet.setPayload(arp);
607         return ethernet;
608     }
609
610     private void startPeriodicTimer() {
611         this.periodicTimer = new Timer("ArpHandler Periodic Timer");
612         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
613             @SuppressWarnings("deprecation")
614             @Override
615             public void run() {
616                 Set<InetAddress> targetIPs = countDownTimers.keySet();
617                 Set<InetAddress> expiredTargets = new HashSet<InetAddress>();
618                 for (InetAddress t : targetIPs) {
619                     short tick = countDownTimers.get(t);
620                     tick--;
621                     if (tick <= 0) {
622                         expiredTargets.add(t);
623                     } else {
624                         countDownTimers.replace(t, tick);
625                     }
626                 }
627                 for (InetAddress tIP : expiredTargets) {
628                     countDownTimers.remove(tIP);
629                     // Remove the requestor(s) who have been waiting for the ARP
630                     // reply from this target for more than 1sec
631                     arpRequestors.remove(tIP);
632                     log.debug("ARP reply was not received from {}", tIP);
633                 }
634
635                 // Clean up ARP event cache
636                 try {
637                     if (clusterContainerService.amICoordinator() && ! arpRequestReplyEvent.isEmpty()){
638                         arpRequestReplyEvent.clear();
639                     }
640                 } catch (Exception e){
641                     log.warn("ARPHandler: A cluster member failed to clear event cache.");
642                 }
643             }
644         }, 0, 1000);
645     }
646
647     private void cancelPeriodicTimer() {
648         if (this.periodicTimer != null) {
649             this.periodicTimer.cancel();
650         }
651     }
652
653     private void generateAndSendReply(InetAddress sourceIP, byte[] sourceMAC) {
654         if (log.isTraceEnabled()) {
655             log.trace("generateAndSendReply called with params sourceIP:{} sourceMAC:{}", sourceIP,
656                       HexEncode.bytesToHexString(sourceMAC));
657         }
658         Set<HostNodeConnector> hosts = arpRequestors.remove(sourceIP);
659         if ((hosts == null) || hosts.isEmpty()) {
660             log.trace("Bailing out no requestors Hosts");
661             return;
662         }
663         countDownTimers.remove(sourceIP);
664         for (HostNodeConnector host : hosts) {
665             if (log.isTraceEnabled()) {
666                 log.trace("Sending ARP Reply with src {}/{}, target {}/{}",
667                           new Object[] {
668                               HexEncode.bytesToHexString(sourceMAC),
669                               sourceIP,
670                               HexEncode.bytesToHexString(host.getDataLayerAddressBytes()),
671                               host.getNetworkAddress() });
672             }
673             if (connectionManager.isLocal(host.getnodeconnectorNode())){
674                 sendARPReply(host.getnodeConnector(),
675                         sourceMAC,
676                         sourceIP,
677                         host.getDataLayerAddressBytes(),
678                         host.getNetworkAddress());
679             } else {
680                 /*
681                  * In the remote event a requestor moved to another
682                  * controller it may turn out it now we need to send
683                  * the ARP reply from a different controller, this
684                  * cover the case
685                  */
686                 arpRequestReplyEvent.put(
687                         new ARPReply(
688                             host.getnodeConnector(),
689                             sourceIP,
690                             sourceMAC,
691                             host.getNetworkAddress(),
692                             host.getDataLayerAddressBytes()), false);
693             }
694         }
695     }
696
697
698     @Override
699     public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
700         log.trace("Got and entryUpdated for cacheName {} key {} isNew {}", cacheName, key, new_value);
701         enqueueARPCacheEvent(key, new_value);
702     }
703
704     @Override
705     public void entryCreated(ARPEvent key, String cacheName, boolean originLocal) {
706         // nothing to do
707     }
708     @Override
709     public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) {
710         // nothing to do
711     }
712
713     private void enqueueARPCacheEvent (ARPEvent event, boolean new_value) {
714         try {
715             ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
716             if (!ARPCacheEvents.contains(cacheEvent)) {
717                 this.ARPCacheEvents.add(cacheEvent);
718                 log.trace("Enqueued {}", event);
719             }
720         } catch (Exception e) {
721             log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
722         }
723     }
724
725     /*
726      * this thread monitors the connectionEvent queue for new incoming events from
727      */
728     private class ARPCacheEventHandler implements Runnable {
729         @Override
730         public void run() {
731             while (!stopping) {
732                 try {
733                     ARPCacheEvent ev = ARPCacheEvents.take();
734                     ARPEvent event = ev.getEvent();
735                     if (event instanceof ARPRequest) {
736                         ARPRequest req = (ARPRequest) event;
737                         // If broadcast request
738                         if (req.getHost() == null) {
739                             log.trace("Trigger and ARP Broadcast Request upon receipt of {}", req);
740                             sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
741
742                         //If unicast and local, send reply
743                         } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) {
744                             log.trace("ARPCacheEventHandler - sendUcatARPRequest upon receipt of {}", req);
745                             sendUcastARPRequest(req.getHost(), req.getSubnet());
746                         }
747                     } else if (event instanceof ARPReply) {
748                         ARPReply rep = (ARPReply) event;
749                         // New reply received by controller, notify all awaiting requestors across the cluster
750                         if (ev.isNewReply()) {
751                             log.trace("Trigger a generateAndSendReply in response to {}", rep);
752                             generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
753                         // Otherwise, a specific reply. If local, send out.
754                         } else if (connectionManager.isLocal(rep.getPort().getNode())) {
755                             log.trace("ARPCacheEventHandler - sendUcatARPReply locally in response to {}", rep);
756                             sendARPReply(rep.getPort(),
757                                     rep.getSourceMac(),
758                                     rep.getSourceIP(),
759                                     rep.getTargetMac(),
760                                     rep.getTargetIP());
761                         }
762                     }
763                 } catch (InterruptedException e) {
764                     ARPCacheEvents.clear();
765                     return;
766                 }
767             }
768         }
769     }
770 }