Merge "Fix clustering versions"
[controller.git] / opendaylight / arphandler / src / main / java / org / opendaylight / controller / arphandler / internal / ArpHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 /*
10  *
11  */
12 package org.opendaylight.controller.arphandler.internal;
13
14 import java.net.InetAddress;
15 import java.net.UnknownHostException;
16 import java.util.Arrays;
17 import java.util.Collections;
18 import java.util.EnumSet;
19 import java.util.HashSet;
20 import java.util.Set;
21 import java.util.Timer;
22 import java.util.TimerTask;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.CopyOnWriteArraySet;
27 import java.util.concurrent.LinkedBlockingQueue;
28
29 import org.opendaylight.controller.arphandler.ARPCacheEvent;
30 import org.opendaylight.controller.arphandler.ARPEvent;
31 import org.opendaylight.controller.arphandler.ARPReply;
32 import org.opendaylight.controller.arphandler.ARPRequest;
33 import org.opendaylight.controller.clustering.services.CacheConfigException;
34 import org.opendaylight.controller.clustering.services.CacheExistException;
35 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
36 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
37 import org.opendaylight.controller.clustering.services.IClusterServices;
38 import org.opendaylight.controller.connectionmanager.IConnectionManager;
39 import org.opendaylight.controller.hosttracker.HostIdFactory;
40 import org.opendaylight.controller.hosttracker.IHostId;
41 import org.opendaylight.controller.hosttracker.IfHostListener;
42 import org.opendaylight.controller.hosttracker.IfIptoHost;
43 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
44 import org.opendaylight.controller.hosttracker.hostAware.IHostFinder;
45 import org.opendaylight.controller.sal.connection.ConnectionLocality;
46 import org.opendaylight.controller.sal.core.ConstructionException;
47 import org.opendaylight.controller.sal.core.Node;
48 import org.opendaylight.controller.sal.core.NodeConnector;
49 import org.opendaylight.controller.sal.packet.ARP;
50 import org.opendaylight.controller.sal.packet.Ethernet;
51 import org.opendaylight.controller.sal.packet.IDataPacketService;
52 import org.opendaylight.controller.sal.packet.IListenDataPacket;
53 import org.opendaylight.controller.sal.packet.IPv4;
54 import org.opendaylight.controller.sal.packet.Packet;
55 import org.opendaylight.controller.sal.packet.PacketResult;
56 import org.opendaylight.controller.sal.packet.RawPacket;
57 import org.opendaylight.controller.sal.routing.IRouting;
58 import org.opendaylight.controller.sal.utils.EtherTypes;
59 import org.opendaylight.controller.sal.utils.HexEncode;
60 import org.opendaylight.controller.sal.utils.NetUtils;
61 import org.opendaylight.controller.switchmanager.ISwitchManager;
62 import org.opendaylight.controller.switchmanager.Subnet;
63 import org.opendaylight.controller.topologymanager.ITopologyManager;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateAware<ARPEvent, Boolean> {
68     private static final Logger log = LoggerFactory.getLogger(ArpHandler.class);
69     static final String ARP_EVENT_CACHE_NAME = "arphandler.arpRequestReplyEvent";
70     private IfIptoHost hostTracker;
71     private ISwitchManager switchManager;
72     private ITopologyManager topologyManager;
73     private IDataPacketService dataPacketService;
74     private IRouting routing;
75     private IClusterContainerServices clusterContainerService;
76     private IConnectionManager connectionManager;
77     private Set<IfHostListener> hostListeners = new CopyOnWriteArraySet<IfHostListener>();
78     private ConcurrentMap<InetAddress, Set<HostNodeConnector>> arpRequestors;
79     private ConcurrentMap<InetAddress, Short> countDownTimers;
80     private Timer periodicTimer;
81     private BlockingQueue<ARPCacheEvent> ARPCacheEvents = new LinkedBlockingQueue<ARPCacheEvent>();
82     private Thread cacheEventHandler;
83     private boolean stopping = false;
84
85     /*
86      * A cluster allocated cache. Used for synchronizing ARP request/reply
87      * events across all cluster controllers. To raise an event, we put() a
88      * specific event object (as key) and all nodes handle it in the
89      * entryUpdated callback.
90      *
91      * In case of ARPReply, we put true value to send replies to any requestors
92      * by calling generateAndSendReply
93      */
94     private ConcurrentMap<ARPEvent, Boolean> arpRequestReplyEvent;
95
96     void setConnectionManager(IConnectionManager cm) {
97         this.connectionManager = cm;
98     }
99
100     void unsetConnectionManager(IConnectionManager cm) {
101         if (this.connectionManager == cm) {
102             connectionManager = null;
103         }
104     }
105
106     void setClusterContainerService(IClusterContainerServices s) {
107         this.clusterContainerService = s;
108     }
109
110     void unsetClusterContainerService(IClusterContainerServices s) {
111         if (this.clusterContainerService == s) {
112             this.clusterContainerService = null;
113         }
114     }
115
116     void setRouting(IRouting r) {
117         this.routing = r;
118     }
119
120     void unsetRouting(IRouting r) {
121         if (this.routing == r) {
122             this.routing = null;
123         }
124     }
125
126     void setHostListener(IfHostListener s) {
127         if (this.hostListeners != null) {
128             this.hostListeners.add(s);
129         }
130     }
131
132     void unsetHostListener(IfHostListener s) {
133         if (this.hostListeners != null) {
134             this.hostListeners.remove(s);
135         }
136     }
137
138     void setDataPacketService(IDataPacketService s) {
139         this.dataPacketService = s;
140     }
141
142     void unsetDataPacketService(IDataPacketService s) {
143         if (this.dataPacketService == s) {
144             this.dataPacketService = null;
145         }
146     }
147
148     public void setHostTracker(IfIptoHost hostTracker) {
149         log.debug("Setting HostTracker");
150         this.hostTracker = hostTracker;
151     }
152
153     public void unsetHostTracker(IfIptoHost s) {
154         log.debug("UNSetting HostTracker");
155         if (this.hostTracker == s) {
156             this.hostTracker = null;
157         }
158     }
159
160     public void setTopologyManager(ITopologyManager tm) {
161         this.topologyManager = tm;
162     }
163
164     public void unsetTopologyManager(ITopologyManager tm) {
165         if (this.topologyManager == tm) {
166             this.topologyManager = null;
167         }
168     }
169
170     protected void sendARPReply(NodeConnector p, byte[] sMAC, InetAddress sIP, byte[] tMAC, InetAddress tIP) {
171         byte[] senderIP = sIP.getAddress();
172         byte[] targetIP = tIP.getAddress();
173         ARP arp = createARP(ARP.REPLY, sMAC, senderIP, tMAC, targetIP);
174
175         if(log.isTraceEnabled()) {
176             log.trace("Sending Arp Reply with srcMac {} - srcIp {} - dstMac {} - dstIp {} - outport {}",
177                     HexEncode.bytesToHexString(sMAC),
178                     sIP, HexEncode.bytesToHexString(tMAC), tIP, p);
179         }
180
181         Ethernet ethernet = createEthernet(sMAC, tMAC, arp);
182
183         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
184         destPkt.setOutgoingNodeConnector(p);
185
186         this.dataPacketService.transmitDataPacket(destPkt);
187     }
188
189     private void logArpPacket(ARP pkt, NodeConnector p) {
190         try {
191             if (pkt.getOpCode() == ARP.REQUEST) {
192                 log.trace("Received Arp Request with srcMac {} - srcIp {} - dstMac {} - dstIp {} - inport {}", HexEncode.bytesToHexString(pkt.getSenderHardwareAddress()),
193                         InetAddress.getByAddress(pkt.getSenderProtocolAddress()), HexEncode.bytesToHexString(pkt.getTargetHardwareAddress()),
194                         InetAddress.getByAddress(pkt.getTargetProtocolAddress()), p);
195             } else if(pkt.getOpCode() == ARP.REPLY) {
196                 log.trace("Received Arp Reply with srcMac {} - srcIp {} - dstMac {} - dstIp {} - inport {}", HexEncode.bytesToHexString(pkt.getSenderHardwareAddress()),
197                         InetAddress.getByAddress(pkt.getSenderProtocolAddress()), HexEncode.bytesToHexString(pkt.getTargetHardwareAddress()),
198                         InetAddress.getByAddress(pkt.getTargetProtocolAddress()), p);
199             }
200         } catch(UnknownHostException e) {
201             log.warn("Illegal Ip Address in the ARP packet", e);
202         }
203     }
204
205     protected void handleARPPacket(Ethernet eHeader, ARP pkt, NodeConnector p) {
206
207         if(log.isTraceEnabled()) {
208             logArpPacket(pkt, p);
209         }
210
211         byte[] sourceMAC = eHeader.getSourceMACAddress();
212         byte[] targetMAC = eHeader.getDestinationMACAddress();
213         /*
214          * Sanity Check; drop ARP packets originated by the controller itself.
215          * This is to avoid continuous flooding
216          */
217         if (Arrays.equals(sourceMAC, getControllerMAC())) {
218             if (log.isDebugEnabled()) {
219                 log.debug("Receive a self originated ARP pkt (srcMAC {}) --> DROP",
220                         HexEncode.bytesToHexString(sourceMAC));
221             }
222             return;
223         }
224
225         InetAddress targetIP, sourceIP;
226         try {
227             targetIP = InetAddress.getByAddress(pkt.getTargetProtocolAddress());
228             sourceIP = InetAddress.getByAddress(pkt.getSenderProtocolAddress());
229         } catch (UnknownHostException e1) {
230             log.debug("Invalid host in ARP packet: {}", e1.getMessage());
231             return;
232         }
233
234         Subnet subnet = null;
235         if (switchManager != null) {
236             subnet = switchManager.getSubnetByNetworkAddress(sourceIP);
237         }
238         if (subnet == null) {
239             log.debug("ARPHandler: can't find subnet matching {}, drop packet", sourceIP);
240             return;
241         }
242
243         // Make sure that the host is a legitimate member of this subnet
244         if (!subnet.hasNodeConnector(p)) {
245             log.debug("{} showing up on {} does not belong to {}", new Object[] { sourceIP, p, subnet });
246             return;
247         }
248
249         HostNodeConnector requestor = null;
250         if (NetUtils.isUnicastMACAddr(sourceMAC) && p.getNode() != null) {
251             try {
252                 requestor = new HostNodeConnector(sourceMAC, sourceIP, p, subnet.getVlan());
253             } catch (ConstructionException e) {
254                 log.debug("Received ARP packet with invalid MAC: {}", HexEncode.bytesToHexString(sourceMAC));
255                 return;
256             }
257             /*
258              * Learn host from the received ARP REQ/REPLY, inform Host Tracker
259              */
260             log.trace("Inform Host tracker of new host {}", requestor.getNetworkAddress());
261             for (IfHostListener listener : this.hostListeners) {
262                 listener.hostListener(requestor);
263             }
264         }
265
266         /*
267          * OpCode != request -> ARP Reply. If there are hosts (in arpRequestors)
268          * waiting for the ARP reply for this sourceIP, it's time to generate
269          * the reply and send it to these hosts.
270          *
271          * If sourceIP==targetIP, it is a Gratuitous ARP. If there are hosts (in
272          * arpRequestors) waiting for the ARP reply for this sourceIP, it's time
273          * to generate the reply and send it to these hosts
274          */
275
276         if (pkt.getOpCode() != ARP.REQUEST || sourceIP.equals(targetIP)) {
277             // Raise a reply event so that any waiting requestors will be sent a
278             // reply
279             // the true value indicates we should generate replies to requestors
280             // across the cluster
281             log.trace("Received ARP reply packet from {}, reply to all requestors.", sourceIP);
282             arpRequestReplyEvent.put(new ARPReply(sourceIP, sourceMAC), true);
283             return;
284         }
285
286         /*
287          * ARP Request Handling: If targetIP is the IP of the subnet, reply with
288          * ARP REPLY If targetIP is a known host, PROXY ARP (by sending ARP
289          * REPLY) on behalf of known target hosts. For unknown target hosts,
290          * generate and send an ARP request to ALL switches/ports using the IP
291          * address defined in the subnet as source address
292          */
293         /*
294          * If target IP is gateway IP, Send ARP reply
295          */
296         if ((targetIP.equals(subnet.getNetworkAddress()))
297                 && (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(targetMAC, getControllerMAC()))) {
298             if (connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL) {
299                 if (log.isTraceEnabled()) {
300                     log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}",
301                             HexEncode.bytesToHexString(getControllerMAC()));
302                 }
303                 sendARPReply(p, getControllerMAC(), targetIP, pkt.getSenderHardwareAddress(), sourceIP);
304             } else {
305                 log.trace("Received non-local ARP req. for default gateway. Raising reply event");
306                 arpRequestReplyEvent.put(
307                         new ARPReply(p, targetIP, getControllerMAC(), sourceIP, pkt.getSenderHardwareAddress()), false);
308             }
309             return;
310         }
311
312         // Hosttracker hosts db key implementation
313         IHostId id = HostIdFactory.create(targetIP, null);
314         HostNodeConnector host = hostTracker.hostQuery(id);
315         // unknown host, initiate ARP request
316         if (host == null) {
317             // add the requestor to the list so that we can replay the reply
318             // when the host responds
319             if (requestor != null) {
320                 Set<HostNodeConnector> requestorSet = arpRequestors.get(targetIP);
321                 if (requestorSet == null) {
322                     requestorSet = Collections.newSetFromMap(new ConcurrentHashMap<HostNodeConnector, Boolean>());
323                     arpRequestors.put(targetIP, requestorSet);
324                 }
325                 requestorSet.add(requestor);
326                 countDownTimers.put(targetIP, (short) 2); // reset timeout to
327                                                           // 2sec
328             }
329             // Raise a bcast request event, all controllers need to send one
330             log.trace("Sending a bcast ARP request for {}", targetIP);
331             arpRequestReplyEvent.put(new ARPRequest(targetIP, subnet), false);
332         } else {
333             /*
334              * Target host known (across the cluster), send ARP REPLY make sure
335              * that targetMAC matches the host's MAC if it is not broadcastMAC
336              */
337             if (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(host.getDataLayerAddressBytes(), targetMAC)) {
338                 log.trace("Received ARP req. for known host {}, sending reply...", targetIP);
339                 if (connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL) {
340                     sendARPReply(p, host.getDataLayerAddressBytes(), host.getNetworkAddress(),
341                             pkt.getSenderHardwareAddress(), sourceIP);
342                 } else {
343                     arpRequestReplyEvent.put(new ARPReply(p, host.getNetworkAddress(), host.getDataLayerAddressBytes(),
344                             sourceIP, pkt.getSenderHardwareAddress()), false);
345                 }
346             } else {
347                 /*
348                  * Target MAC has been changed. For now, discard it. TODO: We
349                  * may need to send unicast ARP REQUEST on behalf of the target
350                  * back to the sender to trigger the sender to update its table
351                  */
352             }
353         }
354     }
355
356     /**
357      * Send a broadcast ARP Request to the switch/ ports using the
358      * networkAddress of the subnet as sender IP the controller's MAC as sender
359      * MAC the targetIP as the target Network Address
360      */
361     protected void sendBcastARPRequest(InetAddress targetIP, Subnet subnet) {
362         log.trace("sendBcatARPRequest targetIP:{} subnet:{}", targetIP, subnet);
363         Set<NodeConnector> nodeConnectors;
364         if (subnet.isFlatLayer2()) {
365             nodeConnectors = new HashSet<NodeConnector>();
366             for (Node n : this.switchManager.getNodes()) {
367                 nodeConnectors.addAll(this.switchManager.getUpNodeConnectors(n));
368             }
369         } else {
370             nodeConnectors = subnet.getNodeConnectors();
371         }
372         byte[] targetHardwareAddress = new byte[] { (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 };
373
374         // TODO: should use IBroadcastHandler instead
375         for (NodeConnector p : nodeConnectors) {
376             // filter out any non-local or internal ports
377             if (!(connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL)
378                     || topologyManager.isInternal(p)) {
379                 continue;
380             }
381             log.trace("Sending toward nodeConnector:{}", p);
382             byte[] senderIP = subnet.getNetworkAddress().getAddress();
383             byte[] targetIPByte = targetIP.getAddress();
384             ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetHardwareAddress, targetIPByte);
385
386             if(log.isTraceEnabled()) {
387                 log.trace("Sending Broadcast Arp Request with srcMac {} - srcIp {} - dstMac {} - dstIp {} - outport {}", HexEncode.bytesToHexString(getControllerMAC()),
388                         subnet.getNetworkAddress(), HexEncode.bytesToHexString(targetHardwareAddress), targetIP, p);
389             }
390
391             byte[] destMACAddress = NetUtils.getBroadcastMACAddr();
392             Ethernet ethernet = createEthernet(getControllerMAC(), destMACAddress, arp);
393
394             // TODO For now send port-by-port, see how to optimize to
395             // send to multiple ports at once
396             RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
397             destPkt.setOutgoingNodeConnector(p);
398
399             this.dataPacketService.transmitDataPacket(destPkt);
400         }
401     }
402
403     /**
404      * Send a unicast ARP Request to the known host on a specific switch/port as
405      * defined in the host. The sender IP is the networkAddress of the subnet
406      * The sender MAC is the controller's MAC
407      */
408     protected void sendUcastARPRequest(HostNodeConnector host, Subnet subnet) {
409         log.trace("sendUcastARPRequest host:{} subnet:{}", host, subnet);
410         NodeConnector outPort = host.getnodeConnector();
411         if (outPort == null) {
412             log.error("Failed sending UcastARP because cannot extract output port from Host: {}", host);
413             return;
414         }
415
416         byte[] senderIP = subnet.getNetworkAddress().getAddress();
417         byte[] targetIP = host.getNetworkAddress().getAddress();
418         byte[] targetMAC = host.getDataLayerAddressBytes();
419         ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetMAC, targetIP);
420
421         if(log.isTraceEnabled()) {
422             log.trace("Sending Unicast Arp Request with srcMac {} - srcIp {} - dstMac {} - dstIp {} - outport {}",
423                     HexEncode.bytesToHexString(getControllerMAC()),
424                     subnet.getNetworkAddress(), HexEncode.bytesToHexString(targetMAC), host.getNetworkAddress(),
425                     outPort);
426         }
427
428         Ethernet ethernet = createEthernet(getControllerMAC(), targetMAC, arp);
429
430         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
431         destPkt.setOutgoingNodeConnector(outPort);
432
433         this.dataPacketService.transmitDataPacket(destPkt);
434     }
435
436     @Override
437     public void find(InetAddress networkAddress) {
438         log.trace("Received find IP {}", networkAddress);
439
440         Subnet subnet = null;
441         if (switchManager != null) {
442             subnet = switchManager.getSubnetByNetworkAddress(networkAddress);
443         }
444         if (subnet == null) {
445             log.debug("Can't find subnet matching IP {}", networkAddress);
446             return;
447         }
448
449         // send a broadcast ARP Request to this IP
450         arpRequestReplyEvent.put(new ARPRequest(networkAddress, subnet), false);
451     }
452
453     /*
454      * Probe the host by sending a unicast ARP Request to the host
455      */
456     @Override
457     public void probe(HostNodeConnector host) {
458         log.trace("Received probe host {}", host);
459
460         Subnet subnet = null;
461         if (switchManager != null) {
462             subnet = switchManager.getSubnetByNetworkAddress(host.getNetworkAddress());
463         }
464         if (subnet == null) {
465             log.debug("can't find subnet matching {}", host.getNetworkAddress());
466             return;
467         }
468
469         if (connectionManager.getLocalityStatus(host.getnodeconnectorNode()) == ConnectionLocality.LOCAL) {
470             log.trace("Send a ucast ARP req. to: {}", host);
471             sendUcastARPRequest(host, subnet);
472         } else {
473             log.trace("Raise a ucast ARP req. event to: {}", host);
474             arpRequestReplyEvent.put(new ARPRequest(host, subnet), false);
475         }
476     }
477
478     /**
479      * An IP packet is punted to the controller, this means that the destination
480      * host is not known to the controller. Need to discover it by sending a
481      * Broadcast ARP Request
482      *
483      * @param pkt
484      * @param p
485      */
486     protected void handlePuntedIPPacket(IPv4 pkt, NodeConnector p) {
487
488         InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress());
489         if (dIP == null) {
490             return;
491         }
492
493         // try to find a matching subnet
494         Subnet subnet = null;
495         if (switchManager != null) {
496             subnet = switchManager.getSubnetByNetworkAddress(dIP);
497         }
498         if (subnet == null) {
499             log.debug("Can't find subnet matching {}, drop packet", dIP);
500             return;
501         }
502         // If packet is sent to the default gw (us), ignore it for now
503         if (subnet.getNetworkAddress().equals(dIP)) {
504             log.trace("Ignore IP packet destined to default gw");
505             return;
506         }
507
508         // see if we know about the host
509         // Hosttracker hosts db key implementation
510         IHostId id = HostIdFactory.create(dIP, null);
511         HostNodeConnector host = hostTracker.hostFind(id);
512
513         if (host == null) {
514             // if we don't, know about the host, try to find it
515             log.trace("Punted IP pkt to {}, sending bcast ARP event...", dIP);
516             /*
517              * unknown destination host, initiate bcast ARP request
518              */
519             arpRequestReplyEvent.put(new ARPRequest(dIP, subnet), false);
520
521         } else if (routing == null || routing.getRoute(p.getNode(), host.getnodeconnectorNode()) != null) {
522             /*
523              * if IRouting is available, make sure that this packet can get it's
524              * destination normally before teleporting it there. If it's not
525              * available, then assume it's reachable.
526              *
527              * TODO: come up with a way to do this in the absence of IRouting
528              */
529
530             log.trace("forwarding punted IP pkt to {} received at {}", dIP, p);
531
532             /*
533              * if we know where the host is and there's a path from where this
534              * packet was punted to where the host is, then deliver it to the
535              * host for now
536              */
537             NodeConnector nc = host.getnodeConnector();
538
539             // re-encode the Ethernet packet (the parent of the IPv4 packet)
540             RawPacket rp = this.dataPacketService.encodeDataPacket(pkt.getParent());
541             rp.setOutgoingNodeConnector(nc);
542             this.dataPacketService.transmitDataPacket(rp);
543         } else {
544             log.trace("ignoring punted IP pkt to {} because there is no route from {}", dIP, p);
545         }
546     }
547
548     public byte[] getControllerMAC() {
549         if (switchManager == null) {
550             return null;
551         }
552         return switchManager.getControllerMAC();
553     }
554
555     /**
556      * Function called by the dependency manager when all the required
557      * dependencies are satisfied
558      *
559      */
560     void init() {
561         arpRequestors = new ConcurrentHashMap<InetAddress, Set<HostNodeConnector>>();
562         countDownTimers = new ConcurrentHashMap<InetAddress, Short>();
563         cacheEventHandler = new Thread(new ARPCacheEventHandler(), "ARPCacheEventHandler Thread");
564
565         allocateCaches();
566         retrieveCaches();
567
568     }
569
570     @SuppressWarnings({ "unchecked" })
571     private void retrieveCaches() {
572         ConcurrentMap<?, ?> map;
573
574         if (this.clusterContainerService == null) {
575             log.error("Cluster service unavailable, can't retieve ARPHandler caches!");
576             return;
577         }
578
579         map = clusterContainerService.getCache(ARP_EVENT_CACHE_NAME);
580         if (map != null) {
581             this.arpRequestReplyEvent = (ConcurrentMap<ARPEvent, Boolean>) map;
582         } else {
583             log.error("Cache allocation failed for {}", ARP_EVENT_CACHE_NAME);
584         }
585     }
586
587     private void allocateCaches() {
588         if (clusterContainerService == null) {
589             nonClusterObjectCreate();
590             log.error("Clustering service unavailable. Allocated non-cluster caches for ARPHandler.");
591             return;
592         }
593
594         try {
595             clusterContainerService.createCache(ARP_EVENT_CACHE_NAME,
596                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
597         } catch (CacheConfigException e) {
598             log.error("ARPHandler cache configuration invalid!");
599         } catch (CacheExistException e) {
600             log.debug("ARPHandler cache exists, skipped allocation.");
601         }
602
603     }
604
605     private void nonClusterObjectCreate() {
606         arpRequestReplyEvent = new ConcurrentHashMap<ARPEvent, Boolean>();
607     }
608
609     /**
610      * Function called by the dependency manager when at least one dependency
611      * become unsatisfied or when the component is shutting down because for
612      * example bundle is being stopped.
613      *
614      */
615     void destroy() {
616         cacheEventHandler.interrupt();
617     }
618
619     /**
620      * Function called by dependency manager after "init ()" is called and after
621      * the services provided by the class are registered in the service registry
622      *
623      */
624     void start() {
625         stopping = false;
626         startPeriodicTimer();
627         cacheEventHandler.start();
628     }
629
630     /**
631      * Function called by the dependency manager before the services exported by
632      * the component are unregistered, this will be followed by a "destroy ()"
633      * calls
634      *
635      */
636     void stop() {
637     }
638
639     void stopping() {
640         stopping = true;
641         cancelPeriodicTimer();
642     }
643
644     void setSwitchManager(ISwitchManager s) {
645         log.debug("SwitchManager service set.");
646         this.switchManager = s;
647     }
648
649     void unsetSwitchManager(ISwitchManager s) {
650         if (this.switchManager == s) {
651             log.debug("SwitchManager service UNset.");
652             this.switchManager = null;
653         }
654     }
655
656     @Override
657     public PacketResult receiveDataPacket(RawPacket inPkt) {
658         if (inPkt == null) {
659             return PacketResult.IGNORED;
660         }
661         log.trace("Received a frame of size: {}", inPkt.getPacketData().length);
662         Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
663         if (formattedPak instanceof Ethernet) {
664             Object nextPak = formattedPak.getPayload();
665             if (nextPak instanceof IPv4) {
666                 log.trace("Handle IP packet: {}", formattedPak);
667                 handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector());
668             } else if (nextPak instanceof ARP) {
669                 log.trace("Handle ARP packet: {}", formattedPak);
670                 handleARPPacket((Ethernet) formattedPak, (ARP) nextPak, inPkt.getIncomingNodeConnector());
671             }
672         }
673         return PacketResult.IGNORED;
674     }
675
676     private ARP createARP(short opCode, byte[] senderMacAddress, byte[] senderIP, byte[] targetMacAddress,
677             byte[] targetIP) {
678         ARP arp = new ARP();
679         arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
680         arp.setProtocolType(EtherTypes.IPv4.shortValue());
681         arp.setHardwareAddressLength((byte) 6);
682         arp.setProtocolAddressLength((byte) 4);
683         arp.setOpCode(opCode);
684         arp.setSenderHardwareAddress(senderMacAddress);
685         arp.setSenderProtocolAddress(senderIP);
686         arp.setTargetHardwareAddress(targetMacAddress);
687         arp.setTargetProtocolAddress(targetIP);
688         return arp;
689     }
690
691     private Ethernet createEthernet(byte[] sourceMAC, byte[] targetMAC, ARP arp) {
692         Ethernet ethernet = new Ethernet();
693         ethernet.setSourceMACAddress(sourceMAC);
694         ethernet.setDestinationMACAddress(targetMAC);
695         ethernet.setEtherType(EtherTypes.ARP.shortValue());
696         ethernet.setPayload(arp);
697         return ethernet;
698     }
699
700     private void startPeriodicTimer() {
701         this.periodicTimer = new Timer("ArpHandler Periodic Timer");
702         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
703             @Override
704             public void run() {
705                 Set<InetAddress> targetIPs = countDownTimers.keySet();
706                 Set<InetAddress> expiredTargets = new HashSet<InetAddress>();
707                 for (InetAddress t : targetIPs) {
708                     short tick = countDownTimers.get(t);
709                     tick--;
710                     if (tick <= 0) {
711                         expiredTargets.add(t);
712                     } else {
713                         countDownTimers.replace(t, tick);
714                     }
715                 }
716                 for (InetAddress tIP : expiredTargets) {
717                     countDownTimers.remove(tIP);
718                     // Remove the requestor(s) who have been waiting for the ARP
719                     // reply from this target for more than 1sec
720                     arpRequestors.remove(tIP);
721                     log.debug("ARP reply was not received from {}", tIP);
722                 }
723
724                 // Clean up ARP event cache
725                 try {
726                     if (clusterContainerService.amICoordinator() && !arpRequestReplyEvent.isEmpty()) {
727                         arpRequestReplyEvent.clear();
728                     }
729                 } catch (Exception e) {
730                     log.warn("ARPHandler: A cluster member failed to clear event cache.");
731                 }
732             }
733         }, 0, 1000);
734     }
735
736     private void cancelPeriodicTimer() {
737         if (this.periodicTimer != null) {
738             this.periodicTimer.cancel();
739         }
740     }
741
742     private void generateAndSendReply(InetAddress sourceIP, byte[] sourceMAC) {
743         if (log.isTraceEnabled()) {
744             log.trace("generateAndSendReply called with params sourceIP:{} sourceMAC:{}", sourceIP,
745                     HexEncode.bytesToHexString(sourceMAC));
746         }
747         Set<HostNodeConnector> hosts = arpRequestors.remove(sourceIP);
748         if ((hosts == null) || hosts.isEmpty()) {
749             log.trace("Bailing out no requestors Hosts");
750             return;
751         }
752         countDownTimers.remove(sourceIP);
753         for (HostNodeConnector host : hosts) {
754             if (log.isTraceEnabled()) {
755                 log.trace(
756                         "Sending ARP Reply with src {}/{}, target {}/{}",
757                         new Object[] { HexEncode.bytesToHexString(sourceMAC), sourceIP,
758                                 HexEncode.bytesToHexString(host.getDataLayerAddressBytes()), host.getNetworkAddress() });
759             }
760             if (connectionManager.getLocalityStatus(host.getnodeconnectorNode()) == ConnectionLocality.LOCAL) {
761                 sendARPReply(host.getnodeConnector(), sourceMAC, sourceIP, host.getDataLayerAddressBytes(),
762                         host.getNetworkAddress());
763             } else {
764                 /*
765                  * In the remote event a requestor moved to another controller
766                  * it may turn out it now we need to send the ARP reply from a
767                  * different controller, this cover the case
768                  */
769                 arpRequestReplyEvent.put(
770                         new ARPReply(host.getnodeConnector(), sourceIP, sourceMAC, host.getNetworkAddress(), host
771                                 .getDataLayerAddressBytes()), false);
772             }
773         }
774     }
775
776     @Override
777     public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
778         log.trace("Got and entryUpdated for cacheName {} key {} isNew {}", cacheName, key, new_value);
779         enqueueARPCacheEvent(key, new_value);
780     }
781
782     @Override
783     public void entryCreated(ARPEvent key, String cacheName, boolean originLocal) {
784         // nothing to do
785     }
786
787     @Override
788     public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) {
789         // nothing to do
790     }
791
792     private void enqueueARPCacheEvent(ARPEvent event, boolean new_value) {
793         try {
794             ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
795             if (!ARPCacheEvents.contains(cacheEvent)) {
796                 this.ARPCacheEvents.add(cacheEvent);
797                 log.trace("Enqueued {}", event);
798             }
799         } catch (Exception e) {
800             log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
801         }
802     }
803
804     /*
805      * this thread monitors the connectionEvent queue for new incoming events
806      * from
807      */
808     private class ARPCacheEventHandler implements Runnable {
809         @Override
810         public void run() {
811             while (!stopping) {
812                 try {
813                     ARPCacheEvent ev = ARPCacheEvents.take();
814                     ARPEvent event = ev.getEvent();
815                     if (event instanceof ARPRequest) {
816                         ARPRequest req = (ARPRequest) event;
817                         // If broadcast request
818                         if (req.getHost() == null) {
819                             log.trace("Trigger and ARP Broadcast Request upon receipt of {}", req);
820                             sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
821
822                             // If unicast and local, send reply
823                         } else if (connectionManager.getLocalityStatus(req.getHost().getnodeconnectorNode()) == ConnectionLocality.LOCAL) {
824                             log.trace("ARPCacheEventHandler - sendUcatARPRequest upon receipt of {}", req);
825                             sendUcastARPRequest(req.getHost(), req.getSubnet());
826                         }
827                     } else if (event instanceof ARPReply) {
828                         ARPReply rep = (ARPReply) event;
829                         // New reply received by controller, notify all awaiting
830                         // requestors across the cluster
831                         if (ev.isNewReply()) {
832                             log.trace("Trigger a generateAndSendReply in response to {}", rep);
833                             generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
834                             // Otherwise, a specific reply. If local, send out.
835                         } else if (connectionManager.getLocalityStatus(rep.getPort().getNode()) == ConnectionLocality.LOCAL) {
836                             log.trace("ARPCacheEventHandler - sendUcatARPReply locally in response to {}", rep);
837                             sendARPReply(rep.getPort(), rep.getSourceMac(), rep.getSourceIP(), rep.getTargetMac(),
838                                     rep.getTargetIP());
839                         }
840                     }
841                 } catch (InterruptedException e) {
842                     ARPCacheEvents.clear();
843                     return;
844                 }
845             }
846         }
847     }
848 }