Merge "Migrating caches to TRANSACTIONAL Caches and enabled use1PcForAutoCommitTransa...
[controller.git] / opendaylight / arphandler / src / main / java / org / opendaylight / controller / arphandler / internal / ArpHandler.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /*
11  *
12  */
13 package org.opendaylight.controller.arphandler.internal;
14
15 import java.net.InetAddress;
16 import java.net.UnknownHostException;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.EnumSet;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.opendaylight.controller.arphandler.ARPCacheEvent;
31 import org.opendaylight.controller.arphandler.ARPEvent;
32 import org.opendaylight.controller.arphandler.ARPReply;
33 import org.opendaylight.controller.arphandler.ARPRequest;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.connectionmanager.IConnectionManager;
40 import org.opendaylight.controller.hosttracker.IfHostListener;
41 import org.opendaylight.controller.hosttracker.IfIptoHost;
42 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
43 import org.opendaylight.controller.hosttracker.hostAware.IHostFinder;
44 import org.opendaylight.controller.sal.core.ConstructionException;
45 import org.opendaylight.controller.sal.core.Node;
46 import org.opendaylight.controller.sal.core.NodeConnector;
47 import org.opendaylight.controller.sal.packet.ARP;
48 import org.opendaylight.controller.sal.packet.Ethernet;
49 import org.opendaylight.controller.sal.packet.IDataPacketService;
50 import org.opendaylight.controller.sal.packet.IListenDataPacket;
51 import org.opendaylight.controller.sal.packet.IPv4;
52 import org.opendaylight.controller.sal.packet.Packet;
53 import org.opendaylight.controller.sal.packet.PacketResult;
54 import org.opendaylight.controller.sal.packet.RawPacket;
55 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
56 import org.opendaylight.controller.sal.utils.EtherTypes;
57 import org.opendaylight.controller.sal.utils.HexEncode;
58 import org.opendaylight.controller.sal.utils.NetUtils;
59 import org.opendaylight.controller.switchmanager.ISwitchManager;
60 import org.opendaylight.controller.switchmanager.Subnet;
61 import org.opendaylight.controller.topologymanager.ITopologyManager;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateAware<ARPEvent, Boolean> {
66     private static final Logger log = LoggerFactory.getLogger(ArpHandler.class);
67     static final String ARP_EVENT_CACHE_NAME = "arphandler.arpRequestReplyEvent";
68     private IfIptoHost hostTracker;
69     private ISwitchManager switchManager;
70     private ITopologyManager topologyManager;
71     private IDataPacketService dataPacketService;
72     private IClusterContainerServices clusterContainerService;
73     private IConnectionManager connectionManager;
74     private Set<IfHostListener> hostListeners = new CopyOnWriteArraySet<IfHostListener>();
75     private ConcurrentMap<InetAddress, Set<HostNodeConnector>> arpRequestors;
76     private ConcurrentMap<InetAddress, Short> countDownTimers;
77     private Timer periodicTimer;
78     private BlockingQueue<ARPCacheEvent> ARPCacheEvents = new LinkedBlockingQueue<ARPCacheEvent>();
79     Thread cacheEventHandler;
80     /*
81      * A cluster allocated cache. Used for synchronizing ARP request/reply
82      * events across all cluster controllers. To raise an event, we put() a specific
83      * event object (as key) and all nodes handle it in the entryUpdated callback.
84      *
85      * In case of ARPReply, we put true value to send replies to any requestors
86      * by calling generateAndSendReply
87      */
88     private ConcurrentMap<ARPEvent, Boolean> arpRequestReplyEvent;
89
90     void setConnectionManager(IConnectionManager cm){
91         this.connectionManager = cm;
92     }
93
94     void unsetConnectionManager(IConnectionManager cm){
95         if (this.connectionManager == cm){
96             connectionManager = null;
97         }
98     }
99
100     void setClusterContainerService(IClusterContainerServices s){
101         this.clusterContainerService = s;
102     }
103
104     void unsetClusterContainerService(IClusterContainerServices s) {
105         if (this.clusterContainerService == s) {
106             this.clusterContainerService = null;
107         }
108     }
109
110     void setHostListener(IfHostListener s) {
111         if (this.hostListeners != null) {
112             this.hostListeners.add(s);
113         }
114     }
115
116     void unsetHostListener(IfHostListener s) {
117         if (this.hostListeners != null) {
118             this.hostListeners.remove(s);
119         }
120     }
121
122     void setDataPacketService(IDataPacketService s) {
123         this.dataPacketService = s;
124     }
125
126     void unsetDataPacketService(IDataPacketService s) {
127         if (this.dataPacketService == s) {
128             this.dataPacketService = null;
129         }
130     }
131
132     public void setHostTracker(IfIptoHost hostTracker) {
133         log.debug("Setting HostTracker");
134         this.hostTracker = hostTracker;
135     }
136
137     public void unsetHostTracker(IfIptoHost s) {
138         log.debug("UNSetting HostTracker");
139         if (this.hostTracker == s) {
140             this.hostTracker = null;
141         }
142     }
143
144     public void setTopologyManager(ITopologyManager tm) {
145         this.topologyManager = tm;
146     }
147
148     public void unsetTopologyManager(ITopologyManager tm) {
149         if (this.topologyManager == tm) {
150             this.topologyManager = null;
151         }
152     }
153
154     protected void sendARPReply(NodeConnector p, byte[] sMAC, InetAddress sIP,
155             byte[] tMAC, InetAddress tIP) {
156         byte[] senderIP = sIP.getAddress();
157         byte[] targetIP = tIP.getAddress();
158         ARP arp = new ARP();
159         arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
160             .setProtocolType(EtherTypes.IPv4.shortValue())
161             .setHardwareAddressLength((byte) 6)
162             .setProtocolAddressLength((byte) 4)
163             .setOpCode(ARP.REPLY)
164             .setSenderHardwareAddress(sMAC)
165             .setSenderProtocolAddress(senderIP)
166             .setTargetHardwareAddress(tMAC)
167             .setTargetProtocolAddress(targetIP);
168
169         Ethernet ethernet = new Ethernet();
170         ethernet.setSourceMACAddress(sMAC)
171             .setDestinationMACAddress(tMAC)
172             .setEtherType(EtherTypes.ARP.shortValue())
173             .setPayload(arp);
174
175         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
176         destPkt.setOutgoingNodeConnector(p);
177
178         this.dataPacketService.transmitDataPacket(destPkt);
179     }
180
181     protected void handleARPPacket(Ethernet eHeader, ARP pkt, NodeConnector p) {
182
183         byte[] sourceMAC = eHeader.getSourceMACAddress();
184         byte[] targetMAC = eHeader.getDestinationMACAddress();
185         /*
186          * Sanity Check; drop ARP packets originated by the controller itself.
187          * This is to avoid continuous flooding
188          */
189         if (Arrays.equals(sourceMAC, getControllerMAC())) {
190             if (log.isDebugEnabled()) {
191                 log.debug("Receive a self originated ARP pkt (srcMAC {}) --> DROP",
192                         HexEncode.bytesToHexString(sourceMAC));
193             }
194             return;
195         }
196
197         InetAddress targetIP, sourceIP;
198         try {
199             targetIP = InetAddress.getByAddress(pkt.getTargetProtocolAddress());
200             sourceIP = InetAddress.getByAddress(pkt.getSenderProtocolAddress());
201         } catch (UnknownHostException e1) {
202             log.debug("Invalid host in ARP packet: {}", e1.getMessage());
203             return;
204         }
205
206         Subnet subnet = null;
207         if (switchManager != null) {
208             subnet = switchManager.getSubnetByNetworkAddress(sourceIP);
209         }
210         if (subnet == null) {
211             log.debug("ARPHandler: can't find subnet matching {}, drop packet", sourceIP);
212             return;
213         }
214
215         // Make sure that the host is a legitimate member of this subnet
216         if (!subnet.hasNodeConnector(p)) {
217             log.debug("{} showing up on {} does not belong to {}",
218                     new Object[] { sourceIP, p, subnet });
219             return;
220         }
221
222         HostNodeConnector requestor = null;
223         if (NetUtils.isUnicastMACAddr(sourceMAC) && p.getNode() != null) {
224             try {
225                 requestor = new HostNodeConnector(sourceMAC, sourceIP, p, subnet.getVlan());
226             } catch (ConstructionException e) {
227                 log.debug("Received ARP packet with invalid MAC: {}", sourceMAC);
228                 return;
229             }
230             /*
231              * Learn host from the received ARP REQ/REPLY, inform Host Tracker
232              */
233             log.trace("Inform Host tracker of new host {}", requestor.getNetworkAddress());
234             for (IfHostListener listener : this.hostListeners) {
235                 listener.hostListener(requestor);
236             }
237         }
238
239         /*
240          * OpCode != request -> ARP Reply. If there are hosts (in
241          * arpRequestors) waiting for the ARP reply for this sourceIP, it's
242          * time to generate the reply and send it to these hosts.
243          *
244          * If sourceIP==targetIP, it is a Gratuitous ARP. If there are hosts (in
245          * arpRequestors) waiting for the ARP reply for this sourceIP, it's time
246          * to generate the reply and send it to these hosts
247          */
248
249         if (pkt.getOpCode() != ARP.REQUEST || sourceIP.equals(targetIP)) {
250             // Raise a reply event so that any waiting requestors will be sent a reply
251             // the true value indicates we should generate replies to requestors across the cluster
252             log.trace("Received ARP reply packet from {}, reply to all requestors.", sourceIP);
253             arpRequestReplyEvent.put(new ARPReply(sourceIP, sourceMAC), true);
254             return;
255         }
256
257         /*
258          * ARP Request Handling:
259          * If targetIP is the IP of the subnet, reply with ARP REPLY
260          * If targetIP is a known host, PROXY ARP (by sending ARP REPLY) on behalf of known target hosts.
261          * For unknown target hosts, generate and send an ARP request to ALL switches/ports using
262          * the IP address defined in the subnet as source address
263          */
264         /*
265          * If target IP is gateway IP, Send ARP reply
266          */
267         if ((targetIP.equals(subnet.getNetworkAddress()))
268                 && (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(targetMAC, getControllerMAC()))) {
269             if (connectionManager.isLocal(p.getNode())){
270                 if (log.isTraceEnabled()){
271                     log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}", getControllerMAC());
272                 }
273                 sendARPReply(p, getControllerMAC(), targetIP, pkt.getSenderHardwareAddress(), sourceIP);
274             } else {
275                 log.trace("Received non-local ARP req. for default gateway. Raising reply event");
276                 arpRequestReplyEvent.put(
277                         new ARPReply(p, targetIP, getControllerMAC(), sourceIP, pkt.getSenderHardwareAddress()), false);
278             }
279             return;
280         }
281
282
283         HostNodeConnector host = hostTracker.hostQuery(targetIP);
284         // unknown host, initiate ARP request
285         if (host == null) {
286             // add the requestor to the list so that we can replay the reply
287             // when the host responds
288             if (requestor != null) {
289                 Set<HostNodeConnector> requestorSet = arpRequestors.get(targetIP);
290                 if (requestorSet == null) {
291                     requestorSet = Collections.newSetFromMap(new ConcurrentHashMap<HostNodeConnector, Boolean>());
292                     arpRequestors.put(targetIP, requestorSet);
293                 }
294                 requestorSet.add(requestor);
295                 countDownTimers.put(targetIP, (short) 2); // reset timeout to 2sec
296             }
297             //Raise a bcast request event, all controllers need to send one
298             log.trace("Sending a bcast ARP request for {}", targetIP);
299             arpRequestReplyEvent.put(new ARPRequest(targetIP, subnet), false);
300
301         } else {
302             /*
303              * Target host known (across the cluster), send ARP REPLY make sure that targetMAC
304              * matches the host's MAC if it is not broadcastMAC
305              */
306             if (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(host.getDataLayerAddressBytes(), targetMAC)) {
307                 log.trace("Received ARP req. for known host {}, sending reply...", targetIP);
308                 if (connectionManager.isLocal(p.getNode())) {
309                     sendARPReply(p,
310                             host.getDataLayerAddressBytes(),
311                             host.getNetworkAddress(),
312                             pkt.getSenderHardwareAddress(),
313                             sourceIP);
314                 } else {
315                     arpRequestReplyEvent.put(new ARPReply(
316                             p,
317                             host.getNetworkAddress(),
318                             host.getDataLayerAddressBytes(),
319                             sourceIP,
320                             pkt.getSenderHardwareAddress()), false);
321                 }
322             } else {
323                 /*
324                  * Target MAC has been changed. For now, discard it.
325                  * TODO: We may need to send unicast ARP REQUEST on behalf of
326                  * the target back to the sender to trigger the sender to update
327                  * its table
328                  */
329             }
330         }
331     }
332
333     /*
334      *  Send a broadcast ARP Request to the switch/ ports  using
335      *  the networkAddress of the subnet as sender IP
336      *  the controller's MAC as sender MAC
337      *  the targetIP as the target Network Address
338      */
339     protected void sendBcastARPRequest(InetAddress targetIP, Subnet subnet) {
340         Set<NodeConnector> nodeConnectors;
341         if (subnet.isFlatLayer2()) {
342             nodeConnectors = new HashSet<NodeConnector>();
343             for (Node n : this.switchManager.getNodes()) {
344                 nodeConnectors.addAll(this.switchManager.getUpNodeConnectors(n));
345             }
346         } else {
347             nodeConnectors = subnet.getNodeConnectors();
348         }
349
350         for (NodeConnector p : nodeConnectors) {
351
352             //fiter out any non-local or internal ports
353             if (! connectionManager.isLocal(p.getNode()) || topologyManager.isInternal(p)) {
354                 continue;
355             }
356             ARP arp = new ARP();
357             byte[] senderIP = subnet.getNetworkAddress().getAddress();
358             byte[] targetIPByte = targetIP.getAddress();
359             arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
360                .setProtocolType(EtherTypes.IPv4.shortValue())
361                .setHardwareAddressLength((byte) 6)
362                .setProtocolAddressLength((byte) 4)
363                .setOpCode(ARP.REQUEST)
364                .setSenderHardwareAddress(getControllerMAC())
365                .setSenderProtocolAddress(senderIP)
366                .setTargetHardwareAddress(
367                        new byte[] { (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 })
368                .setTargetProtocolAddress(targetIPByte);
369
370             Ethernet ethernet = new Ethernet();
371             ethernet.setSourceMACAddress(getControllerMAC())
372                     .setDestinationMACAddress(new byte[] {(byte) -1,
373                                                           (byte) -1,
374                                                           (byte) -1,
375                                                           (byte) -1,
376                                                           (byte) -1,
377                                                           (byte) -1 })
378                     .setEtherType(EtherTypes.ARP.shortValue()).setPayload(arp);
379
380             // TODO For now send port-by-port, see how to optimize to
381             // send to multiple ports at once
382             RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
383             destPkt.setOutgoingNodeConnector(p);
384
385             this.dataPacketService.transmitDataPacket(destPkt);
386         }
387     }
388
389     /*
390      * Send a unicast ARP Request to the known host on a specific switch/port as
391      * defined in the host.
392      * The sender IP is the networkAddress of the subnet
393      * The sender MAC is the controller's MAC
394      */
395     protected void sendUcastARPRequest(HostNodeConnector host, Subnet subnet) {
396
397         NodeConnector outPort = host.getnodeConnector();
398         if (outPort == null) {
399             log.error("Failed sending UcastARP because cannot extract output port from Host: {}", host);
400             return;
401         }
402
403         byte[] senderIP = subnet.getNetworkAddress().getAddress();
404         byte[] targetIP = host.getNetworkAddress().getAddress();
405         byte[] targetMAC = host.getDataLayerAddressBytes();
406         ARP arp = new ARP();
407         arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
408             .setProtocolType(EtherTypes.IPv4.shortValue())
409             .setHardwareAddressLength((byte) 6)
410             .setProtocolAddressLength((byte) 4)
411             .setOpCode(ARP.REQUEST)
412             .setSenderHardwareAddress(getControllerMAC())
413             .setSenderProtocolAddress(senderIP)
414             .setTargetHardwareAddress(targetMAC)
415             .setTargetProtocolAddress(targetIP);
416
417         Ethernet ethernet = new Ethernet();
418         ethernet.setSourceMACAddress(getControllerMAC())
419                 .setDestinationMACAddress(targetMAC)
420                 .setEtherType(EtherTypes.ARP.shortValue())
421                 .setPayload(arp);
422
423         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
424         destPkt.setOutgoingNodeConnector(outPort);
425
426         this.dataPacketService.transmitDataPacket(destPkt);
427     }
428
429     public void find(InetAddress networkAddress) {
430         log.trace("Received find IP {}", networkAddress);
431
432         Subnet subnet = null;
433         if (switchManager != null) {
434             subnet = switchManager.getSubnetByNetworkAddress(networkAddress);
435         }
436         if (subnet == null) {
437             log.debug("Can't find subnet matching IP {}", networkAddress);
438             return;
439         }
440
441         // send a broadcast ARP Request to this IP
442         arpRequestReplyEvent.put(new ARPRequest(networkAddress, subnet), false);
443     }
444
445     /*
446      * Probe the host by sending a unicast ARP Request to the host
447      */
448     public void probe(HostNodeConnector host) {
449         log.trace("Received probe host {}", host);
450
451         Subnet subnet = null;
452         if (switchManager != null) {
453             subnet = switchManager.getSubnetByNetworkAddress(host
454                     .getNetworkAddress());
455         }
456         if (subnet == null) {
457             log.debug("can't find subnet matching {}", host.getNetworkAddress());
458             return;
459         }
460
461         if (connectionManager.isLocal(host.getnodeconnectorNode())){
462             log.trace("Send a ucast ARP req. to: {}", host);
463             sendUcastARPRequest(host, subnet);
464         } else {
465             log.trace("Raise a ucast ARP req. event to: {}", host);
466             arpRequestReplyEvent.put(new ARPRequest(host, subnet), false);
467         }
468     }
469
470     /*
471      * An IP packet is punted to the controller, this means that the
472      * destination host is not known to the controller.
473      * Need to discover it by sending a Broadcast ARP Request
474      */
475     protected void handlePuntedIPPacket(IPv4 pkt, NodeConnector p) {
476
477         InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress());
478         if (dIP == null) {
479            return;
480         }
481
482         Subnet subnet = null;
483         if (switchManager != null) {
484             subnet = switchManager.getSubnetByNetworkAddress(dIP);
485         }
486         if (subnet == null) {
487             log.debug("Can't find subnet matching {}, drop packet", dIP);
488             return;
489         }
490         log.trace("Punted IP pkt from {}, sending bcast ARP event...", dIP);
491         /*
492          * unknown destination host, initiate bcast ARP request
493          */
494         arpRequestReplyEvent.put(new ARPRequest(dIP, subnet), false);
495         return;
496     }
497
498     public byte[] getControllerMAC() {
499         if (switchManager == null) {
500             return null;
501         }
502         return switchManager.getControllerMAC();
503     }
504
505     /**
506      * Function called by the dependency manager when all the required
507      * dependencies are satisfied
508      *
509      */
510     void init() {
511         arpRequestors = new ConcurrentHashMap<InetAddress, Set<HostNodeConnector>>();
512         countDownTimers = new ConcurrentHashMap<InetAddress, Short>();
513         cacheEventHandler = new Thread(new ARPCacheEventHandler(), "ARPCacheEventHandler Thread");
514
515         allocateCaches();
516         retrieveCaches();
517     }
518
519     @SuppressWarnings({ "unchecked", "deprecation" })
520     private void retrieveCaches() {
521         ConcurrentMap<?,?> map;
522
523         if (this.clusterContainerService == null){
524             log.error("Cluster service unavailable, can't retieve ARPHandler caches!");
525             return;
526         }
527
528         map = clusterContainerService.getCache(ARP_EVENT_CACHE_NAME);
529         if (map != null){
530             this.arpRequestReplyEvent = (ConcurrentMap<ARPEvent, Boolean>) map;
531         } else {
532             log.error("Cache allocation failed for {}", ARP_EVENT_CACHE_NAME);
533         }
534     }
535
536     @SuppressWarnings("deprecation")
537     private void allocateCaches() {
538         if (clusterContainerService == null){
539             nonClusterObjectCreate();
540             log.error("Clustering service unavailable. Allocated non-cluster caches for ARPHandler.");
541             return;
542         }
543
544         try{
545             clusterContainerService.createCache(ARP_EVENT_CACHE_NAME,
546                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
547         } catch (CacheConfigException e){
548             log.error("ARPHandler cache configuration invalid!");
549         } catch (CacheExistException e){
550             log.debug("ARPHandler cache exists, skipped allocation.");
551         }
552
553     }
554
555     private void nonClusterObjectCreate(){
556         arpRequestReplyEvent = new ConcurrentHashMap<ARPEvent, Boolean>();
557     }
558     /**
559      * Function called by the dependency manager when at least one
560      * dependency become unsatisfied or when the component is shutting
561      * down because for example bundle is being stopped.
562      *
563      */
564     void destroy() {
565     }
566
567     /**
568      * Function called by dependency manager after "init ()" is called
569      * and after the services provided by the class are registered in
570      * the service registry
571      *
572      */
573     void start() {
574         startPeriodicTimer();
575         cacheEventHandler.start();
576
577     }
578
579     /**
580      * Function called by the dependency manager before the services
581      * exported by the component are unregistered, this will be
582      * followed by a "destroy ()" calls
583      *
584      */
585     void stop(){
586     }
587
588     void stopping() {
589         cancelPeriodicTimer();
590     }
591
592     void setSwitchManager(ISwitchManager s) {
593         log.debug("SwitchManager service set.");
594         this.switchManager = s;
595     }
596
597     void unsetSwitchManager(ISwitchManager s) {
598         if (this.switchManager == s) {
599             log.debug("SwitchManager service UNset.");
600             this.switchManager = null;
601         }
602     }
603
604     @Override
605     public PacketResult receiveDataPacket(RawPacket inPkt) {
606         if (inPkt == null) {
607             return PacketResult.IGNORED;
608         }
609         log.trace("Received a frame of size: {}", inPkt.getPacketData().length);
610         Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
611         if (formattedPak instanceof Ethernet) {
612             Object nextPak = formattedPak.getPayload();
613             if (nextPak instanceof IPv4) {
614                 log.trace("Handle IP packet: {}", formattedPak);
615                 handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector());
616             } else if (nextPak instanceof ARP) {
617                 log.trace("Handle ARP packet: {}", formattedPak);
618                 handleARPPacket((Ethernet) formattedPak, (ARP) nextPak, inPkt
619                         .getIncomingNodeConnector());
620             }
621         }
622         return PacketResult.IGNORED;
623     }
624
625     private void startPeriodicTimer() {
626         this.periodicTimer = new Timer("ArpHandler Periodic Timer");
627         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
628             @SuppressWarnings("deprecation")
629             @Override
630             public void run() {
631                 Set<InetAddress> targetIPs = countDownTimers.keySet();
632                 Set<InetAddress> expiredTargets = new HashSet<InetAddress>();
633                 for (InetAddress t : targetIPs) {
634                     short tick = countDownTimers.get(t);
635                     tick--;
636                     if (tick <= 0) {
637                         expiredTargets.add(t);
638                     } else {
639                         countDownTimers.replace(t, tick);
640                     }
641                 }
642                 for (InetAddress tIP : expiredTargets) {
643                     countDownTimers.remove(tIP);
644                     // Remove the requestor(s) who have been waiting for the ARP
645                     // reply from this target for more than 1sec
646                     arpRequestors.remove(tIP);
647                     log.debug("ARP reply was not received from {}", tIP);
648                 }
649
650                 // Clean up ARP event cache
651                 try {
652                     if (clusterContainerService.amICoordinator() && ! arpRequestReplyEvent.isEmpty()){
653                         arpRequestReplyEvent.clear();
654                     }
655                 } catch (Exception e){
656                     log.warn("ARPHandler: A cluster member failed to clear event cache.");
657                 }
658             }
659         }, 0, 1000);
660     }
661
662     private void cancelPeriodicTimer() {
663         if (this.periodicTimer != null) {
664             this.periodicTimer.cancel();
665         }
666     }
667
668     private void generateAndSendReply(InetAddress sourceIP, byte[] sourceMAC) {
669         Set<HostNodeConnector> hosts = arpRequestors.remove(sourceIP);
670         if ((hosts == null) || hosts.isEmpty()) {
671             return;
672         }
673         countDownTimers.remove(sourceIP);
674         for (HostNodeConnector host : hosts) {
675             log.trace("Sending ARP Reply with src {}/{}, target {}/{}",
676                     new Object[] { sourceMAC, sourceIP, host.getDataLayerAddressBytes(), host.getNetworkAddress() });
677
678             if (connectionManager.isLocal(host.getnodeconnectorNode())){
679                 sendARPReply(host.getnodeConnector(),
680                         sourceMAC,
681                         sourceIP,
682                         host.getDataLayerAddressBytes(),
683                         host.getNetworkAddress());
684             } else {
685                 arpRequestReplyEvent.put(
686                         new ARPReply(
687                             host.getnodeConnector(),
688                             sourceIP,
689                             sourceMAC,
690                             host.getNetworkAddress(),
691                             host.getDataLayerAddressBytes()), false);
692             }
693         }
694     }
695
696
697     @Override
698     public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
699         enqueueARPCacheEvent(key, new_value);
700     }
701
702     @Override
703     public void entryCreated(ARPEvent key, String cacheName, boolean originLocal) {
704         // nothing to do
705     }
706     @Override
707     public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) {
708         // nothing to do
709     }
710
711     private void enqueueARPCacheEvent (ARPEvent event, boolean new_value) {
712         try {
713             ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
714             if (!ARPCacheEvents.contains(cacheEvent)) {
715                 this.ARPCacheEvents.add(cacheEvent);
716             }
717         } catch (Exception e) {
718             log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
719         }
720     }
721
722     /*
723      * this thread monitors the connectionEvent queue for new incoming events from
724      */
725     private class ARPCacheEventHandler implements Runnable {
726         @Override
727         public void run() {
728             while (true) {
729                 try {
730                     ARPCacheEvent ev = ARPCacheEvents.take();
731                     ARPEvent event = ev.getEvent();
732                     if (event instanceof ARPRequest) {
733                         ARPRequest req = (ARPRequest) event;
734                         // If broadcast request
735                         if (req.getHost() == null) {
736                             sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
737
738                         //If unicast and local, send reply
739                         } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) {
740                             sendUcastARPRequest(req.getHost(), req.getSubnet());
741                         }
742                     } else if (event instanceof ARPReply) {
743                         ARPReply rep = (ARPReply) event;
744                         // New reply received by controller, notify all awaiting requestors across the cluster
745                         if (ev.isNewReply()) {
746                             generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
747
748                         // Otherwise, a specific reply. If local, send out.
749                         } else if (connectionManager.isLocal(rep.getPort().getNode())) {
750                             sendARPReply(rep.getPort(),
751                                     rep.getSourceMac(),
752                                     rep.getSourceIP(),
753                                     rep.getTargetMac(),
754                                     rep.getTargetIP());
755                         }
756                     }
757                 } catch (InterruptedException e) {
758                     ARPCacheEvents.clear();
759                     return;
760                 }
761             }
762         }
763     }
764 }