Make use of NetUtils.getBroadcastMacAddr()
[controller.git] / opendaylight / arphandler / src / main / java / org / opendaylight / controller / arphandler / internal / ArpHandler.java
index 0ff1cd9bd6a542dc8694b09933bfe2c1835d3bbd..e345d65f7e527754981de650e8c75e47a72b47d8 100644 (file)
@@ -21,10 +21,13 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import org.opendaylight.controller.arphandler.ARPCacheEvent;
 import org.opendaylight.controller.arphandler.ARPEvent;
 import org.opendaylight.controller.arphandler.ARPReply;
 import org.opendaylight.controller.arphandler.ARPRequest;
@@ -71,6 +74,9 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
     private ConcurrentMap<InetAddress, Set<HostNodeConnector>> arpRequestors;
     private ConcurrentMap<InetAddress, Short> countDownTimers;
     private Timer periodicTimer;
+    private BlockingQueue<ARPCacheEvent> ARPCacheEvents = new LinkedBlockingQueue<ARPCacheEvent>();
+    private Thread cacheEventHandler;
+    private boolean stopping = false;
     /*
      * A cluster allocated cache. Used for synchronizing ARP request/reply
      * events across all cluster controllers. To raise an event, we put() a specific
@@ -149,22 +155,9 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
             byte[] tMAC, InetAddress tIP) {
         byte[] senderIP = sIP.getAddress();
         byte[] targetIP = tIP.getAddress();
-        ARP arp = new ARP();
-        arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
-            .setProtocolType(EtherTypes.IPv4.shortValue())
-            .setHardwareAddressLength((byte) 6)
-            .setProtocolAddressLength((byte) 4)
-            .setOpCode(ARP.REPLY)
-            .setSenderHardwareAddress(sMAC)
-            .setSenderProtocolAddress(senderIP)
-            .setTargetHardwareAddress(tMAC)
-            .setTargetProtocolAddress(targetIP);
+        ARP arp = createARP(ARP.REPLY,sMAC,senderIP,tMAC,targetIP);
 
-        Ethernet ethernet = new Ethernet();
-        ethernet.setSourceMACAddress(sMAC)
-            .setDestinationMACAddress(tMAC)
-            .setEtherType(EtherTypes.ARP.shortValue())
-            .setPayload(arp);
+        Ethernet ethernet = createEthernet(sMAC, tMAC, arp);
 
         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
         destPkt.setOutgoingNodeConnector(p);
@@ -218,7 +211,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
             try {
                 requestor = new HostNodeConnector(sourceMAC, sourceIP, p, subnet.getVlan());
             } catch (ConstructionException e) {
-                log.debug("Received ARP packet with invalid MAC: {}", sourceMAC);
+                log.debug("Received ARP packet with invalid MAC: {}", HexEncode.bytesToHexString(sourceMAC));
                 return;
             }
             /*
@@ -262,7 +255,8 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
                 && (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(targetMAC, getControllerMAC()))) {
             if (connectionManager.isLocal(p.getNode())){
                 if (log.isTraceEnabled()){
-                    log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}", getControllerMAC());
+                    log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}",
+                            HexEncode.bytesToHexString(getControllerMAC()));
                 }
                 sendARPReply(p, getControllerMAC(), targetIP, pkt.getSenderHardwareAddress(), sourceIP);
             } else {
@@ -291,7 +285,6 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
             //Raise a bcast request event, all controllers need to send one
             log.trace("Sending a bcast ARP request for {}", targetIP);
             arpRequestReplyEvent.put(new ARPRequest(targetIP, subnet), false);
-
         } else {
             /*
              * Target host known (across the cluster), send ARP REPLY make sure that targetMAC
@@ -331,6 +324,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
      *  the targetIP as the target Network Address
      */
     protected void sendBcastARPRequest(InetAddress targetIP, Subnet subnet) {
+        log.trace("sendBcatARPRequest targetIP:{} subnet:{}", targetIP, subnet);
         Set<NodeConnector> nodeConnectors;
         if (subnet.isFlatLayer2()) {
             nodeConnectors = new HashSet<NodeConnector>();
@@ -340,36 +334,19 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
         } else {
             nodeConnectors = subnet.getNodeConnectors();
         }
-
+        byte[] targetHardwareAddress = new byte[] { (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 };
         for (NodeConnector p : nodeConnectors) {
-
             //fiter out any non-local or internal ports
             if (! connectionManager.isLocal(p.getNode()) || topologyManager.isInternal(p)) {
                 continue;
             }
-            ARP arp = new ARP();
+            log.trace("Sending toward nodeConnector:{}", p);
             byte[] senderIP = subnet.getNetworkAddress().getAddress();
             byte[] targetIPByte = targetIP.getAddress();
-            arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
-               .setProtocolType(EtherTypes.IPv4.shortValue())
-               .setHardwareAddressLength((byte) 6)
-               .setProtocolAddressLength((byte) 4)
-               .setOpCode(ARP.REQUEST)
-               .setSenderHardwareAddress(getControllerMAC())
-               .setSenderProtocolAddress(senderIP)
-               .setTargetHardwareAddress(
-                       new byte[] { (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 })
-               .setTargetProtocolAddress(targetIPByte);
-
-            Ethernet ethernet = new Ethernet();
-            ethernet.setSourceMACAddress(getControllerMAC())
-                    .setDestinationMACAddress(new byte[] {(byte) -1,
-                                                          (byte) -1,
-                                                          (byte) -1,
-                                                          (byte) -1,
-                                                          (byte) -1,
-                                                          (byte) -1 })
-                    .setEtherType(EtherTypes.ARP.shortValue()).setPayload(arp);
+            ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetHardwareAddress, targetIPByte);
+
+            byte[] destMACAddress = NetUtils.getBroadcastMACAddr();
+            Ethernet ethernet = createEthernet(getControllerMAC(), destMACAddress, arp);
 
             // TODO For now send port-by-port, see how to optimize to
             // send to multiple ports at once
@@ -387,7 +364,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
      * The sender MAC is the controller's MAC
      */
     protected void sendUcastARPRequest(HostNodeConnector host, Subnet subnet) {
-
+        log.trace("sendUcastARPRequest host:{} subnet:{}", host, subnet);
         NodeConnector outPort = host.getnodeConnector();
         if (outPort == null) {
             log.error("Failed sending UcastARP because cannot extract output port from Host: {}", host);
@@ -397,22 +374,9 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
         byte[] senderIP = subnet.getNetworkAddress().getAddress();
         byte[] targetIP = host.getNetworkAddress().getAddress();
         byte[] targetMAC = host.getDataLayerAddressBytes();
-        ARP arp = new ARP();
-        arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
-            .setProtocolType(EtherTypes.IPv4.shortValue())
-            .setHardwareAddressLength((byte) 6)
-            .setProtocolAddressLength((byte) 4)
-            .setOpCode(ARP.REQUEST)
-            .setSenderHardwareAddress(getControllerMAC())
-            .setSenderProtocolAddress(senderIP)
-            .setTargetHardwareAddress(targetMAC)
-            .setTargetProtocolAddress(targetIP);
+        ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetMAC, targetIP);
 
-        Ethernet ethernet = new Ethernet();
-        ethernet.setSourceMACAddress(getControllerMAC())
-                .setDestinationMACAddress(targetMAC)
-                .setEtherType(EtherTypes.ARP.shortValue())
-                .setPayload(arp);
+        Ethernet ethernet = createEthernet(getControllerMAC(), targetMAC, arp);
 
         RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet);
         destPkt.setOutgoingNodeConnector(outPort);
@@ -420,6 +384,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
         this.dataPacketService.transmitDataPacket(destPkt);
     }
 
+    @Override
     public void find(InetAddress networkAddress) {
         log.trace("Received find IP {}", networkAddress);
 
@@ -439,6 +404,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
     /*
      * Probe the host by sending a unicast ARP Request to the host
      */
+    @Override
     public void probe(HostNodeConnector host) {
         log.trace("Received probe host {}", host);
 
@@ -504,6 +470,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
     void init() {
         arpRequestors = new ConcurrentHashMap<InetAddress, Set<HostNodeConnector>>();
         countDownTimers = new ConcurrentHashMap<InetAddress, Short>();
+        cacheEventHandler = new Thread(new ARPCacheEventHandler(), "ARPCacheEventHandler Thread");
 
         allocateCaches();
         retrieveCaches();
@@ -536,7 +503,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
 
         try{
             clusterContainerService.createCache(ARP_EVENT_CACHE_NAME,
-                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
         } catch (CacheConfigException e){
             log.error("ARPHandler cache configuration invalid!");
         } catch (CacheExistException e){
@@ -555,6 +522,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
      *
      */
     void destroy() {
+        cacheEventHandler.interrupt();
     }
 
     /**
@@ -564,7 +532,9 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
      *
      */
     void start() {
+        stopping = false;
         startPeriodicTimer();
+        cacheEventHandler.start();
     }
 
     /**
@@ -577,6 +547,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
     }
 
     void stopping() {
+        stopping = true;
         cancelPeriodicTimer();
     }
 
@@ -613,6 +584,29 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
         return PacketResult.IGNORED;
     }
 
+    private ARP createARP(short opCode, byte[] senderMacAddress, byte[] senderIP, byte[] targetMacAddress, byte[] targetIP) {
+            ARP arp = new ARP();
+            arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
+            arp.setProtocolType(EtherTypes.IPv4.shortValue());
+            arp.setHardwareAddressLength((byte) 6);
+            arp.setProtocolAddressLength((byte) 4);
+            arp.setOpCode(opCode);
+            arp.setSenderHardwareAddress(senderMacAddress) ;
+            arp.setSenderProtocolAddress(senderIP);
+            arp.setTargetHardwareAddress(targetMacAddress);
+            arp.setTargetProtocolAddress(targetIP);
+            return arp;
+    }
+
+    private Ethernet createEthernet(byte[] sourceMAC, byte[] targetMAC, ARP arp) {
+        Ethernet ethernet = new Ethernet();
+        ethernet.setSourceMACAddress(sourceMAC);
+        ethernet.setDestinationMACAddress(targetMAC);
+        ethernet.setEtherType(EtherTypes.ARP.shortValue());
+        ethernet.setPayload(arp);
+        return ethernet;
+    }
+
     private void startPeriodicTimer() {
         this.periodicTimer = new Timer("ArpHandler Periodic Timer");
         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
@@ -657,15 +651,25 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
     }
 
     private void generateAndSendReply(InetAddress sourceIP, byte[] sourceMAC) {
+        if (log.isTraceEnabled()) {
+            log.trace("generateAndSendReply called with params sourceIP:{} sourceMAC:{}", sourceIP,
+                      HexEncode.bytesToHexString(sourceMAC));
+        }
         Set<HostNodeConnector> hosts = arpRequestors.remove(sourceIP);
         if ((hosts == null) || hosts.isEmpty()) {
+            log.trace("Bailing out no requestors Hosts");
             return;
         }
         countDownTimers.remove(sourceIP);
         for (HostNodeConnector host : hosts) {
-            log.trace("Sending ARP Reply with src {}/{}, target {}/{}",
-                    new Object[] { sourceMAC, sourceIP, host.getDataLayerAddressBytes(), host.getNetworkAddress() });
-
+            if (log.isTraceEnabled()) {
+                log.trace("Sending ARP Reply with src {}/{}, target {}/{}",
+                          new Object[] {
+                              HexEncode.bytesToHexString(sourceMAC),
+                              sourceIP,
+                              HexEncode.bytesToHexString(host.getDataLayerAddressBytes()),
+                              host.getNetworkAddress() });
+            }
             if (connectionManager.isLocal(host.getnodeconnectorNode())){
                 sendARPReply(host.getnodeConnector(),
                         sourceMAC,
@@ -673,6 +677,12 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
                         host.getDataLayerAddressBytes(),
                         host.getNetworkAddress());
             } else {
+                /*
+                 * In the remote event a requestor moved to another
+                 * controller it may turn out it now we need to send
+                 * the ARP reply from a different controller, this
+                 * cover the case
+                 */
                 arpRequestReplyEvent.put(
                         new ARPReply(
                             host.getnodeConnector(),
@@ -687,31 +697,8 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
 
     @Override
     public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
-        if (key instanceof ARPRequest) {
-            ARPRequest req = (ARPRequest) key;
-            // If broadcast request
-            if (req.getHost() == null) {
-                sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
-
-            //If unicast and local, send reply
-            } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) {
-                sendUcastARPRequest(req.getHost(), req.getSubnet());
-            }
-        } else if (key instanceof ARPReply) {
-            ARPReply rep = (ARPReply) key;
-            // New reply received by controller, notify all awaiting requestors across the cluster
-            if (new_value) {
-                generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
-
-            // Otherwise, a specific reply. If local, send out.
-            } else if (connectionManager.isLocal(rep.getPort().getNode())) {
-                sendARPReply(rep.getPort(),
-                        rep.getSourceMac(),
-                        rep.getSourceIP(),
-                        rep.getTargetMac(),
-                        rep.getTargetIP());
-            }
-        }
+        log.trace("Got and entryUpdated for cacheName {} key {} isNew {}", cacheName, key, new_value);
+        enqueueARPCacheEvent(key, new_value);
     }
 
     @Override
@@ -722,4 +709,62 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
     public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) {
         // nothing to do
     }
+
+    private void enqueueARPCacheEvent (ARPEvent event, boolean new_value) {
+        try {
+            ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
+            if (!ARPCacheEvents.contains(cacheEvent)) {
+                this.ARPCacheEvents.add(cacheEvent);
+                log.trace("Enqueued {}", event);
+            }
+        } catch (Exception e) {
+            log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
+        }
+    }
+
+    /*
+     * this thread monitors the connectionEvent queue for new incoming events from
+     */
+    private class ARPCacheEventHandler implements Runnable {
+        @Override
+        public void run() {
+            while (!stopping) {
+                try {
+                    ARPCacheEvent ev = ARPCacheEvents.take();
+                    ARPEvent event = ev.getEvent();
+                    if (event instanceof ARPRequest) {
+                        ARPRequest req = (ARPRequest) event;
+                        // If broadcast request
+                        if (req.getHost() == null) {
+                            log.trace("Trigger and ARP Broadcast Request upon receipt of {}", req);
+                            sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
+
+                        //If unicast and local, send reply
+                        } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) {
+                            log.trace("ARPCacheEventHandler - sendUcatARPRequest upon receipt of {}", req);
+                            sendUcastARPRequest(req.getHost(), req.getSubnet());
+                        }
+                    } else if (event instanceof ARPReply) {
+                        ARPReply rep = (ARPReply) event;
+                        // New reply received by controller, notify all awaiting requestors across the cluster
+                        if (ev.isNewReply()) {
+                            log.trace("Trigger a generateAndSendReply in response to {}", rep);
+                            generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
+                        // Otherwise, a specific reply. If local, send out.
+                        } else if (connectionManager.isLocal(rep.getPort().getNode())) {
+                            log.trace("ARPCacheEventHandler - sendUcatARPReply locally in response to {}", rep);
+                            sendARPReply(rep.getPort(),
+                                    rep.getSourceMac(),
+                                    rep.getSourceIP(),
+                                    rep.getTargetMac(),
+                                    rep.getTargetIP());
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    ARPCacheEvents.clear();
+                    return;
+                }
+            }
+        }
+    }
 }