Merge "HA - ARPHandler Event sync"
[controller.git] / opendaylight / arphandler / src / main / java / org / opendaylight / controller / arphandler / internal / ArpHandler.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /*
11  *
12  */
13 package org.opendaylight.controller.arphandler.internal;
14
15 import java.net.InetAddress;
16 import java.net.UnknownHostException;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.EnumSet;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.CopyOnWriteArraySet;
27
28 import org.opendaylight.controller.arphandler.ARPEvent;
29 import org.opendaylight.controller.arphandler.ARPReply;
30 import org.opendaylight.controller.arphandler.ARPRequest;
31 import org.opendaylight.controller.clustering.services.CacheConfigException;
32 import org.opendaylight.controller.clustering.services.CacheExistException;
33 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
34 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
35 import org.opendaylight.controller.clustering.services.IClusterServices;
36 import org.opendaylight.controller.connectionmanager.IConnectionManager;
37 import org.opendaylight.controller.hosttracker.IfHostListener;
38 import org.opendaylight.controller.hosttracker.IfIptoHost;
39 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
40 import org.opendaylight.controller.hosttracker.hostAware.IHostFinder;
41 import org.opendaylight.controller.sal.core.ConstructionException;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.packet.ARP;
45 import org.opendaylight.controller.sal.packet.Ethernet;
46 import org.opendaylight.controller.sal.packet.IDataPacketService;
47 import org.opendaylight.controller.sal.packet.IListenDataPacket;
48 import org.opendaylight.controller.sal.packet.IPv4;
49 import org.opendaylight.controller.sal.packet.Packet;
50 import org.opendaylight.controller.sal.packet.PacketResult;
51 import org.opendaylight.controller.sal.packet.RawPacket;
52 import org.opendaylight.controller.sal.utils.EtherTypes;
53 import org.opendaylight.controller.sal.utils.HexEncode;
54 import org.opendaylight.controller.sal.utils.NetUtils;
55 import org.opendaylight.controller.switchmanager.ISwitchManager;
56 import org.opendaylight.controller.switchmanager.Subnet;
57 import org.opendaylight.controller.topologymanager.ITopologyManager;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateAware<ARPEvent, Boolean> {
62     private static final Logger log = LoggerFactory.getLogger(ArpHandler.class);
63     static final String ARP_EVENT_CACHE_NAME = "arphandler.arpRequestReplyEvent";
64     private IfIptoHost hostTracker;
65     private ISwitchManager switchManager;
66     private ITopologyManager topologyManager;
67     private IDataPacketService dataPacketService;
68     private IClusterContainerServices clusterContainerService;
69     private IConnectionManager connectionManager;
70     private Set<IfHostListener> hostListeners = new CopyOnWriteArraySet<IfHostListener>();
71     private ConcurrentMap<InetAddress, Set<HostNodeConnector>> arpRequestors;
72     private ConcurrentMap<InetAddress, Short> countDownTimers;
73     private Timer periodicTimer;
74     /*
75      * A cluster allocated cache. Used for synchronizing ARP request/reply
76      * events across all cluster controllers. To raise an event, we put() a specific
77      * event object (as key) and all nodes handle it in the entryUpdated callback.
78      *
79      * In case of ARPReply, we put true value to send replies to any requestors
80      * by calling generateAndSendReply
81      */
82     private ConcurrentMap<ARPEvent, Boolean> arpRequestReplyEvent;
83
84     void setConnectionManager(IConnectionManager cm){
85         this.connectionManager = cm;
86     }
87
88     void unsetConnectionManager(IConnectionManager cm){
89         if (this.connectionManager == cm){
90             connectionManager = null;
91         }
92     }
93
94     void setClusterContainerService(IClusterContainerServices s){
95         this.clusterContainerService = s;
96     }
97
98     void unsetClusterContainerService(IClusterContainerServices s) {
99         if (this.clusterContainerService == s) {
100             this.clusterContainerService = null;
101         }
102     }
103
104     void setHostListener(IfHostListener s) {
105         if (this.hostListeners != null) {
106             this.hostListeners.add(s);
107         }
108     }
109
110     void unsetHostListener(IfHostListener s) {
111         if (this.hostListeners != null) {
112             this.hostListeners.remove(s);
113         }
114     }
115
116     void setDataPacketService(IDataPacketService s) {
117         this.dataPacketService = s;
118     }
119
120     void unsetDataPacketService(IDataPacketService s) {
121         if (this.dataPacketService == s) {
122             this.dataPacketService = null;
123         }
124     }
125
126     public void setHostTracker(IfIptoHost hostTracker) {
127         log.debug("Setting HostTracker");
128         this.hostTracker = hostTracker;
129     }
130
131     public void unsetHostTracker(IfIptoHost s) {
132         log.debug("UNSetting HostTracker");
133         if (this.hostTracker == s) {
134             this.hostTracker = null;
135         }
136     }
137
138     public void setTopologyManager(ITopologyManager tm) {
139         this.topologyManager = tm;
140     }
141
142     public void unsetTopologyManager(ITopologyManager tm) {
143         if (this.topologyManager == tm) {
144             this.topologyManager = null;
145         }
146     }
147
148     protected void sendARPReply(NodeConnector p, byte[] sMAC, InetAddress sIP,
149             byte[] tMAC, InetAddress tIP) {
150         byte[] senderIP = sIP.getAddress();
151         byte[] targetIP = tIP.getAddress();
152         ARP arp = new ARP();
153         arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
154             .setProtocolType(EtherTypes.IPv4.shortValue())
155             .setHardwareAddressLength((byte) 6)
156             .setProtocolAddressLength((byte) 4)
157             .setOpCode(ARP.REPLY)
158             .setSenderHardwareAddress(sMAC)
159             .setSenderProtocolAddress(senderIP)
160             .setTargetHardwareAddress(tMAC)
161             .setTargetProtocolAddress(targetIP);
162
163         Ethernet ethernet = new Ethernet();
164         ethernet.setSourceMACAddress(sMAC)
165             .setDestinationMACAddress(tMAC)
166             .setEtherType(EtherTypes.ARP.shortValue())
167             .setPayload(arp);
168
169         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
170         destPkt.setOutgoingNodeConnector(p);
171
172         this.dataPacketService.transmitDataPacket(destPkt);
173     }
174
175     protected void handleARPPacket(Ethernet eHeader, ARP pkt, NodeConnector p) {
176
177         byte[] sourceMAC = eHeader.getSourceMACAddress();
178         byte[] targetMAC = eHeader.getDestinationMACAddress();
179         /*
180          * Sanity Check; drop ARP packets originated by the controller itself.
181          * This is to avoid continuous flooding
182          */
183         if (Arrays.equals(sourceMAC, getControllerMAC())) {
184             if (log.isDebugEnabled()) {
185                 log.debug("Receive a self originated ARP pkt (srcMAC {}) --> DROP",
186                         HexEncode.bytesToHexString(sourceMAC));
187             }
188             return;
189         }
190
191         InetAddress targetIP, sourceIP;
192         try {
193             targetIP = InetAddress.getByAddress(pkt.getTargetProtocolAddress());
194             sourceIP = InetAddress.getByAddress(pkt.getSenderProtocolAddress());
195         } catch (UnknownHostException e1) {
196             log.debug("Invalid host in ARP packet: {}", e1.getMessage());
197             return;
198         }
199
200         Subnet subnet = null;
201         if (switchManager != null) {
202             subnet = switchManager.getSubnetByNetworkAddress(sourceIP);
203         }
204         if (subnet == null) {
205             log.debug("ARPHandler: can't find subnet matching {}, drop packet", sourceIP);
206             return;
207         }
208
209         // Make sure that the host is a legitimate member of this subnet
210         if (!subnet.hasNodeConnector(p)) {
211             log.debug("{} showing up on {} does not belong to {}",
212                     new Object[] { sourceIP, p, subnet });
213             return;
214         }
215
216         HostNodeConnector requestor = null;
217         if (NetUtils.isUnicastMACAddr(sourceMAC) && p.getNode() != null) {
218             try {
219                 requestor = new HostNodeConnector(sourceMAC, sourceIP, p, subnet.getVlan());
220             } catch (ConstructionException e) {
221                 log.debug("Received ARP packet with invalid MAC: {}", sourceMAC);
222                 return;
223             }
224             /*
225              * Learn host from the received ARP REQ/REPLY, inform Host Tracker
226              */
227             log.trace("Inform Host tracker of new host {}", requestor.getNetworkAddress());
228             for (IfHostListener listener : this.hostListeners) {
229                 listener.hostListener(requestor);
230             }
231         }
232
233         /*
234          * OpCode != request -> ARP Reply. If there are hosts (in
235          * arpRequestors) waiting for the ARP reply for this sourceIP, it's
236          * time to generate the reply and send it to these hosts.
237          *
238          * If sourceIP==targetIP, it is a Gratuitous ARP. If there are hosts (in
239          * arpRequestors) waiting for the ARP reply for this sourceIP, it's time
240          * to generate the reply and send it to these hosts
241          */
242
243         if (pkt.getOpCode() != ARP.REQUEST || sourceIP.equals(targetIP)) {
244             // Raise a reply event so that any waiting requestors will be sent a reply
245             // the true value indicates we should generate replies to requestors across the cluster
246             log.trace("Received ARP reply packet from {}, reply to all requestors.", sourceIP);
247             arpRequestReplyEvent.put(new ARPReply(sourceIP, sourceMAC), true);
248             return;
249         }
250
251         /*
252          * ARP Request Handling:
253          * If targetIP is the IP of the subnet, reply with ARP REPLY
254          * If targetIP is a known host, PROXY ARP (by sending ARP REPLY) on behalf of known target hosts.
255          * For unknown target hosts, generate and send an ARP request to ALL switches/ports using
256          * the IP address defined in the subnet as source address
257          */
258         /*
259          * If target IP is gateway IP, Send ARP reply
260          */
261         if ((targetIP.equals(subnet.getNetworkAddress()))
262                 && (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(targetMAC, getControllerMAC()))) {
263             if (connectionManager.isLocal(p.getNode())){
264                 if (log.isTraceEnabled()){
265                     log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}", getControllerMAC());
266                 }
267                 sendARPReply(p, getControllerMAC(), targetIP, pkt.getSenderHardwareAddress(), sourceIP);
268             } else {
269                 log.trace("Received non-local ARP req. for default gateway. Raising reply event");
270                 arpRequestReplyEvent.put(
271                         new ARPReply(p, targetIP, getControllerMAC(), sourceIP, pkt.getSenderHardwareAddress()), false);
272             }
273             return;
274         }
275
276
277         HostNodeConnector host = hostTracker.hostQuery(targetIP);
278         // unknown host, initiate ARP request
279         if (host == null) {
280             // add the requestor to the list so that we can replay the reply
281             // when the host responds
282             if (requestor != null) {
283                 Set<HostNodeConnector> requestorSet = arpRequestors.get(targetIP);
284                 if (requestorSet == null) {
285                     requestorSet = Collections.newSetFromMap(new ConcurrentHashMap<HostNodeConnector, Boolean>());
286                     arpRequestors.put(targetIP, requestorSet);
287                 }
288                 requestorSet.add(requestor);
289                 countDownTimers.put(targetIP, (short) 2); // reset timeout to 2sec
290             }
291             //Raise a bcast request event, all controllers need to send one
292             log.trace("Sending a bcast ARP request for {}", targetIP);
293             arpRequestReplyEvent.put(new ARPRequest(targetIP, subnet), false);
294
295         } else {
296             /*
297              * Target host known (across the cluster), send ARP REPLY make sure that targetMAC
298              * matches the host's MAC if it is not broadcastMAC
299              */
300             if (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(host.getDataLayerAddressBytes(), targetMAC)) {
301                 log.trace("Received ARP req. for known host {}, sending reply...", targetIP);
302                 if (connectionManager.isLocal(p.getNode())) {
303                     sendARPReply(p,
304                             host.getDataLayerAddressBytes(),
305                             host.getNetworkAddress(),
306                             pkt.getSenderHardwareAddress(),
307                             sourceIP);
308                 } else {
309                     arpRequestReplyEvent.put(new ARPReply(
310                             p,
311                             host.getNetworkAddress(),
312                             host.getDataLayerAddressBytes(),
313                             sourceIP,
314                             pkt.getSenderHardwareAddress()), false);
315                 }
316             } else {
317                 /*
318                  * Target MAC has been changed. For now, discard it.
319                  * TODO: We may need to send unicast ARP REQUEST on behalf of
320                  * the target back to the sender to trigger the sender to update
321                  * its table
322                  */
323             }
324         }
325     }
326
327     /*
328      *  Send a broadcast ARP Request to the switch/ ports  using
329      *  the networkAddress of the subnet as sender IP
330      *  the controller's MAC as sender MAC
331      *  the targetIP as the target Network Address
332      */
333     protected void sendBcastARPRequest(InetAddress targetIP, Subnet subnet) {
334         Set<NodeConnector> nodeConnectors;
335         if (subnet.isFlatLayer2()) {
336             nodeConnectors = new HashSet<NodeConnector>();
337             for (Node n : this.switchManager.getNodes()) {
338                 nodeConnectors.addAll(this.switchManager.getUpNodeConnectors(n));
339             }
340         } else {
341             nodeConnectors = subnet.getNodeConnectors();
342         }
343
344         for (NodeConnector p : nodeConnectors) {
345
346             //fiter out any non-local or internal ports
347             if (! connectionManager.isLocal(p.getNode()) || topologyManager.isInternal(p)) {
348                 continue;
349             }
350             ARP arp = new ARP();
351             byte[] senderIP = subnet.getNetworkAddress().getAddress();
352             byte[] targetIPByte = targetIP.getAddress();
353             arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
354                .setProtocolType(EtherTypes.IPv4.shortValue())
355                .setHardwareAddressLength((byte) 6)
356                .setProtocolAddressLength((byte) 4)
357                .setOpCode(ARP.REQUEST)
358                .setSenderHardwareAddress(getControllerMAC())
359                .setSenderProtocolAddress(senderIP)
360                .setTargetHardwareAddress(
361                        new byte[] { (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 })
362                .setTargetProtocolAddress(targetIPByte);
363
364             Ethernet ethernet = new Ethernet();
365             ethernet.setSourceMACAddress(getControllerMAC())
366                     .setDestinationMACAddress(new byte[] {(byte) -1,
367                                                           (byte) -1,
368                                                           (byte) -1,
369                                                           (byte) -1,
370                                                           (byte) -1,
371                                                           (byte) -1 })
372                     .setEtherType(EtherTypes.ARP.shortValue()).setPayload(arp);
373
374             // TODO For now send port-by-port, see how to optimize to
375             // send to multiple ports at once
376             RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
377             destPkt.setOutgoingNodeConnector(p);
378
379             this.dataPacketService.transmitDataPacket(destPkt);
380         }
381     }
382
383     /*
384      * Send a unicast ARP Request to the known host on a specific switch/port as
385      * defined in the host.
386      * The sender IP is the networkAddress of the subnet
387      * The sender MAC is the controller's MAC
388      */
389     protected void sendUcastARPRequest(HostNodeConnector host, Subnet subnet) {
390
391         NodeConnector outPort = host.getnodeConnector();
392         if (outPort == null) {
393             log.error("Failed sending UcastARP because cannot extract output port from Host: {}", host);
394             return;
395         }
396
397         byte[] senderIP = subnet.getNetworkAddress().getAddress();
398         byte[] targetIP = host.getNetworkAddress().getAddress();
399         byte[] targetMAC = host.getDataLayerAddressBytes();
400         ARP arp = new ARP();
401         arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
402             .setProtocolType(EtherTypes.IPv4.shortValue())
403             .setHardwareAddressLength((byte) 6)
404             .setProtocolAddressLength((byte) 4)
405             .setOpCode(ARP.REQUEST)
406             .setSenderHardwareAddress(getControllerMAC())
407             .setSenderProtocolAddress(senderIP)
408             .setTargetHardwareAddress(targetMAC)
409             .setTargetProtocolAddress(targetIP);
410
411         Ethernet ethernet = new Ethernet();
412         ethernet.setSourceMACAddress(getControllerMAC())
413                 .setDestinationMACAddress(targetMAC)
414                 .setEtherType(EtherTypes.ARP.shortValue())
415                 .setPayload(arp);
416
417         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
418         destPkt.setOutgoingNodeConnector(outPort);
419
420         this.dataPacketService.transmitDataPacket(destPkt);
421     }
422
423     public void find(InetAddress networkAddress) {
424         log.trace("Received find IP {}", networkAddress);
425
426         Subnet subnet = null;
427         if (switchManager != null) {
428             subnet = switchManager.getSubnetByNetworkAddress(networkAddress);
429         }
430         if (subnet == null) {
431             log.debug("Can't find subnet matching IP {}", networkAddress);
432             return;
433         }
434
435         // send a broadcast ARP Request to this IP
436         arpRequestReplyEvent.put(new ARPRequest(networkAddress, subnet), false);
437     }
438
439     /*
440      * Probe the host by sending a unicast ARP Request to the host
441      */
442     public void probe(HostNodeConnector host) {
443         log.trace("Received probe host {}", host);
444
445         Subnet subnet = null;
446         if (switchManager != null) {
447             subnet = switchManager.getSubnetByNetworkAddress(host
448                     .getNetworkAddress());
449         }
450         if (subnet == null) {
451             log.debug("can't find subnet matching {}", host.getNetworkAddress());
452             return;
453         }
454
455         if (connectionManager.isLocal(host.getnodeconnectorNode())){
456             log.trace("Send a ucast ARP req. to: {}", host);
457             sendUcastARPRequest(host, subnet);
458         } else {
459             log.trace("Raise a ucast ARP req. event to: {}", host);
460             arpRequestReplyEvent.put(new ARPRequest(host, subnet), false);
461         }
462     }
463
464     /*
465      * An IP packet is punted to the controller, this means that the
466      * destination host is not known to the controller.
467      * Need to discover it by sending a Broadcast ARP Request
468      */
469     protected void handlePuntedIPPacket(IPv4 pkt, NodeConnector p) {
470
471         InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress());
472         if (dIP == null) {
473            return;
474         }
475
476         Subnet subnet = null;
477         if (switchManager != null) {
478             subnet = switchManager.getSubnetByNetworkAddress(dIP);
479         }
480         if (subnet == null) {
481             log.debug("Can't find subnet matching {}, drop packet", dIP);
482             return;
483         }
484         log.trace("Punted IP pkt from {}, sending bcast ARP event...", dIP);
485         /*
486          * unknown destination host, initiate bcast ARP request
487          */
488         arpRequestReplyEvent.put(new ARPRequest(dIP, subnet), false);
489         return;
490     }
491
492     public byte[] getControllerMAC() {
493         if (switchManager == null) {
494             return null;
495         }
496         return switchManager.getControllerMAC();
497     }
498
499     /**
500      * Function called by the dependency manager when all the required
501      * dependencies are satisfied
502      *
503      */
504     void init() {
505         arpRequestors = new ConcurrentHashMap<InetAddress, Set<HostNodeConnector>>();
506         countDownTimers = new ConcurrentHashMap<InetAddress, Short>();
507
508         allocateCaches();
509         retrieveCaches();
510     }
511
512     @SuppressWarnings({ "unchecked", "deprecation" })
513     private void retrieveCaches() {
514         ConcurrentMap<?,?> map;
515
516         if (this.clusterContainerService == null){
517             log.error("Cluster service unavailable, can't retieve ARPHandler caches!");
518             return;
519         }
520
521         map = clusterContainerService.getCache(ARP_EVENT_CACHE_NAME);
522         if (map != null){
523             this.arpRequestReplyEvent = (ConcurrentMap<ARPEvent, Boolean>) map;
524         } else {
525             log.error("Cache allocation failed for {}", ARP_EVENT_CACHE_NAME);
526         }
527     }
528
529     @SuppressWarnings("deprecation")
530     private void allocateCaches() {
531         if (clusterContainerService == null){
532             nonClusterObjectCreate();
533             log.error("Clustering service unavailable. Allocated non-cluster caches for ARPHandler.");
534             return;
535         }
536
537         try{
538             clusterContainerService.createCache(ARP_EVENT_CACHE_NAME,
539                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
540         } catch (CacheConfigException e){
541             log.error("ARPHandler cache configuration invalid!");
542         } catch (CacheExistException e){
543             log.debug("ARPHandler cache exists, skipped allocation.");
544         }
545
546     }
547
548     private void nonClusterObjectCreate(){
549         arpRequestReplyEvent = new ConcurrentHashMap<ARPEvent, Boolean>();
550     }
551     /**
552      * Function called by the dependency manager when at least one
553      * dependency become unsatisfied or when the component is shutting
554      * down because for example bundle is being stopped.
555      *
556      */
557     void destroy() {
558     }
559
560     /**
561      * Function called by dependency manager after "init ()" is called
562      * and after the services provided by the class are registered in
563      * the service registry
564      *
565      */
566     void start() {
567         startPeriodicTimer();
568     }
569
570     /**
571      * Function called by the dependency manager before the services
572      * exported by the component are unregistered, this will be
573      * followed by a "destroy ()" calls
574      *
575      */
576     void stop(){
577     }
578
579     void stopping() {
580         cancelPeriodicTimer();
581     }
582
583     void setSwitchManager(ISwitchManager s) {
584         log.debug("SwitchManager service set.");
585         this.switchManager = s;
586     }
587
588     void unsetSwitchManager(ISwitchManager s) {
589         if (this.switchManager == s) {
590             log.debug("SwitchManager service UNset.");
591             this.switchManager = null;
592         }
593     }
594
595     @Override
596     public PacketResult receiveDataPacket(RawPacket inPkt) {
597         if (inPkt == null) {
598             return PacketResult.IGNORED;
599         }
600         log.trace("Received a frame of size: {}", inPkt.getPacketData().length);
601         Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
602         if (formattedPak instanceof Ethernet) {
603             Object nextPak = formattedPak.getPayload();
604             if (nextPak instanceof IPv4) {
605                 log.trace("Handle IP packet: {}", formattedPak);
606                 handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector());
607             } else if (nextPak instanceof ARP) {
608                 log.trace("Handle ARP packet: {}", formattedPak);
609                 handleARPPacket((Ethernet) formattedPak, (ARP) nextPak, inPkt
610                         .getIncomingNodeConnector());
611             }
612         }
613         return PacketResult.IGNORED;
614     }
615
616     private void startPeriodicTimer() {
617         this.periodicTimer = new Timer("ArpHandler Periodic Timer");
618         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
619             @SuppressWarnings("deprecation")
620             @Override
621             public void run() {
622                 Set<InetAddress> targetIPs = countDownTimers.keySet();
623                 Set<InetAddress> expiredTargets = new HashSet<InetAddress>();
624                 for (InetAddress t : targetIPs) {
625                     short tick = countDownTimers.get(t);
626                     tick--;
627                     if (tick <= 0) {
628                         expiredTargets.add(t);
629                     } else {
630                         countDownTimers.replace(t, tick);
631                     }
632                 }
633                 for (InetAddress tIP : expiredTargets) {
634                     countDownTimers.remove(tIP);
635                     // Remove the requestor(s) who have been waiting for the ARP
636                     // reply from this target for more than 1sec
637                     arpRequestors.remove(tIP);
638                     log.debug("ARP reply was not received from {}", tIP);
639                 }
640
641                 // Clean up ARP event cache
642                 try {
643                     if (clusterContainerService.amICoordinator() && ! arpRequestReplyEvent.isEmpty()){
644                         arpRequestReplyEvent.clear();
645                     }
646                 } catch (Exception e){
647                     log.warn("ARPHandler: A cluster member failed to clear event cache.");
648                 }
649             }
650         }, 0, 1000);
651     }
652
653     private void cancelPeriodicTimer() {
654         if (this.periodicTimer != null) {
655             this.periodicTimer.cancel();
656         }
657     }
658
659     private void generateAndSendReply(InetAddress sourceIP, byte[] sourceMAC) {
660         Set<HostNodeConnector> hosts = arpRequestors.remove(sourceIP);
661         if ((hosts == null) || hosts.isEmpty()) {
662             return;
663         }
664         countDownTimers.remove(sourceIP);
665         for (HostNodeConnector host : hosts) {
666             log.trace("Sending ARP Reply with src {}/{}, target {}/{}",
667                     new Object[] { sourceMAC, sourceIP, host.getDataLayerAddressBytes(), host.getNetworkAddress() });
668
669             if (connectionManager.isLocal(host.getnodeconnectorNode())){
670                 sendARPReply(host.getnodeConnector(),
671                         sourceMAC,
672                         sourceIP,
673                         host.getDataLayerAddressBytes(),
674                         host.getNetworkAddress());
675             } else {
676                 arpRequestReplyEvent.put(
677                         new ARPReply(
678                             host.getnodeConnector(),
679                             sourceIP,
680                             sourceMAC,
681                             host.getNetworkAddress(),
682                             host.getDataLayerAddressBytes()), false);
683             }
684         }
685     }
686
687
688     @Override
689     public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
690         if (key instanceof ARPRequest) {
691             ARPRequest req = (ARPRequest) key;
692             // If broadcast request
693             if (req.getHost() == null) {
694                 sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
695
696             //If unicast and local, send reply
697             } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) {
698                 sendUcastARPRequest(req.getHost(), req.getSubnet());
699             }
700         } else if (key instanceof ARPReply) {
701             ARPReply rep = (ARPReply) key;
702             // New reply received by controller, notify all awaiting requestors across the cluster
703             if (new_value) {
704                 generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
705
706             // Otherwise, a specific reply. If local, send out.
707             } else if (connectionManager.isLocal(rep.getPort().getNode())) {
708                 sendARPReply(rep.getPort(),
709                         rep.getSourceMac(),
710                         rep.getSourceIP(),
711                         rep.getTargetMac(),
712                         rep.getTargetIP());
713             }
714         }
715     }
716
717     @Override
718     public void entryCreated(ARPEvent key, String cacheName, boolean originLocal) {
719         // nothing to do
720     }
721     @Override
722     public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) {
723         // nothing to do
724     }
725 }