X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Farphandler%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Farphandler%2Finternal%2FArpHandler.java;h=abe104a40638f1a58e47fdcc5f91eda6b1f87359;hb=5c0caf3a94c0ad62c266922b8b303006b633ae0c;hp=0ff1cd9bd6a542dc8694b09933bfe2c1835d3bbd;hpb=b34143e6c369a6746c5d5399ed17964812a106f0;p=controller.git diff --git a/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java b/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java index 0ff1cd9bd6..abe104a406 100644 --- a/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java +++ b/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java @@ -21,10 +21,13 @@ import java.util.HashSet; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; +import org.opendaylight.controller.arphandler.ARPCacheEvent; import org.opendaylight.controller.arphandler.ARPEvent; import org.opendaylight.controller.arphandler.ARPReply; import org.opendaylight.controller.arphandler.ARPRequest; @@ -38,6 +41,7 @@ import org.opendaylight.controller.hosttracker.IfHostListener; import org.opendaylight.controller.hosttracker.IfIptoHost; import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector; import org.opendaylight.controller.hosttracker.hostAware.IHostFinder; +import org.opendaylight.controller.sal.connection.ConnectionLocality; import org.opendaylight.controller.sal.core.ConstructionException; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; @@ -49,6 +53,7 @@ import org.opendaylight.controller.sal.packet.IPv4; import org.opendaylight.controller.sal.packet.Packet; import org.opendaylight.controller.sal.packet.PacketResult; import org.opendaylight.controller.sal.packet.RawPacket; +import org.opendaylight.controller.sal.routing.IRouting; import org.opendaylight.controller.sal.utils.EtherTypes; import org.opendaylight.controller.sal.utils.HexEncode; import org.opendaylight.controller.sal.utils.NetUtils; @@ -65,12 +70,16 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA private ISwitchManager switchManager; private ITopologyManager topologyManager; private IDataPacketService dataPacketService; + private IRouting routing; private IClusterContainerServices clusterContainerService; private IConnectionManager connectionManager; private Set hostListeners = new CopyOnWriteArraySet(); private ConcurrentMap> arpRequestors; private ConcurrentMap countDownTimers; private Timer periodicTimer; + private BlockingQueue ARPCacheEvents = new LinkedBlockingQueue(); + private Thread cacheEventHandler; + private boolean stopping = false; /* * A cluster allocated cache. Used for synchronizing ARP request/reply * events across all cluster controllers. To raise an event, we put() a specific @@ -101,6 +110,16 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA } } + void setRouting(IRouting r) { + this.routing = r; + } + + void unsetRouting(IRouting r) { + if (this.routing == r) { + this.routing = null; + } + } + void setHostListener(IfHostListener s) { if (this.hostListeners != null) { this.hostListeners.add(s); @@ -149,22 +168,9 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA byte[] tMAC, InetAddress tIP) { byte[] senderIP = sIP.getAddress(); byte[] targetIP = tIP.getAddress(); - ARP arp = new ARP(); - arp.setHardwareType(ARP.HW_TYPE_ETHERNET) - .setProtocolType(EtherTypes.IPv4.shortValue()) - .setHardwareAddressLength((byte) 6) - .setProtocolAddressLength((byte) 4) - .setOpCode(ARP.REPLY) - .setSenderHardwareAddress(sMAC) - .setSenderProtocolAddress(senderIP) - .setTargetHardwareAddress(tMAC) - .setTargetProtocolAddress(targetIP); + ARP arp = createARP(ARP.REPLY,sMAC,senderIP,tMAC,targetIP); - Ethernet ethernet = new Ethernet(); - ethernet.setSourceMACAddress(sMAC) - .setDestinationMACAddress(tMAC) - .setEtherType(EtherTypes.ARP.shortValue()) - .setPayload(arp); + Ethernet ethernet = createEthernet(sMAC, tMAC, arp); RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet); destPkt.setOutgoingNodeConnector(p); @@ -218,7 +224,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA try { requestor = new HostNodeConnector(sourceMAC, sourceIP, p, subnet.getVlan()); } catch (ConstructionException e) { - log.debug("Received ARP packet with invalid MAC: {}", sourceMAC); + log.debug("Received ARP packet with invalid MAC: {}", HexEncode.bytesToHexString(sourceMAC)); return; } /* @@ -260,9 +266,10 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA */ if ((targetIP.equals(subnet.getNetworkAddress())) && (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(targetMAC, getControllerMAC()))) { - if (connectionManager.isLocal(p.getNode())){ + if (connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL){ if (log.isTraceEnabled()){ - log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}", getControllerMAC()); + log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}", + HexEncode.bytesToHexString(getControllerMAC())); } sendARPReply(p, getControllerMAC(), targetIP, pkt.getSenderHardwareAddress(), sourceIP); } else { @@ -291,7 +298,6 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA //Raise a bcast request event, all controllers need to send one log.trace("Sending a bcast ARP request for {}", targetIP); arpRequestReplyEvent.put(new ARPRequest(targetIP, subnet), false); - } else { /* * Target host known (across the cluster), send ARP REPLY make sure that targetMAC @@ -299,7 +305,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA */ if (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(host.getDataLayerAddressBytes(), targetMAC)) { log.trace("Received ARP req. for known host {}, sending reply...", targetIP); - if (connectionManager.isLocal(p.getNode())) { + if (connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL) { sendARPReply(p, host.getDataLayerAddressBytes(), host.getNetworkAddress(), @@ -324,13 +330,14 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA } } - /* + /** * Send a broadcast ARP Request to the switch/ ports using * the networkAddress of the subnet as sender IP * the controller's MAC as sender MAC * the targetIP as the target Network Address */ protected void sendBcastARPRequest(InetAddress targetIP, Subnet subnet) { + log.trace("sendBcatARPRequest targetIP:{} subnet:{}", targetIP, subnet); Set nodeConnectors; if (subnet.isFlatLayer2()) { nodeConnectors = new HashSet(); @@ -340,36 +347,21 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA } else { nodeConnectors = subnet.getNodeConnectors(); } + byte[] targetHardwareAddress = new byte[] { (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 }; + //TODO: should use IBroadcastHandler instead for (NodeConnector p : nodeConnectors) { - - //fiter out any non-local or internal ports - if (! connectionManager.isLocal(p.getNode()) || topologyManager.isInternal(p)) { + //filter out any non-local or internal ports + if (!(connectionManager.getLocalityStatus(p.getNode()) == ConnectionLocality.LOCAL) || topologyManager.isInternal(p)) { continue; } - ARP arp = new ARP(); + log.trace("Sending toward nodeConnector:{}", p); byte[] senderIP = subnet.getNetworkAddress().getAddress(); byte[] targetIPByte = targetIP.getAddress(); - arp.setHardwareType(ARP.HW_TYPE_ETHERNET) - .setProtocolType(EtherTypes.IPv4.shortValue()) - .setHardwareAddressLength((byte) 6) - .setProtocolAddressLength((byte) 4) - .setOpCode(ARP.REQUEST) - .setSenderHardwareAddress(getControllerMAC()) - .setSenderProtocolAddress(senderIP) - .setTargetHardwareAddress( - new byte[] { (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0, (byte) 0 }) - .setTargetProtocolAddress(targetIPByte); - - Ethernet ethernet = new Ethernet(); - ethernet.setSourceMACAddress(getControllerMAC()) - .setDestinationMACAddress(new byte[] {(byte) -1, - (byte) -1, - (byte) -1, - (byte) -1, - (byte) -1, - (byte) -1 }) - .setEtherType(EtherTypes.ARP.shortValue()).setPayload(arp); + ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetHardwareAddress, targetIPByte); + + byte[] destMACAddress = NetUtils.getBroadcastMACAddr(); + Ethernet ethernet = createEthernet(getControllerMAC(), destMACAddress, arp); // TODO For now send port-by-port, see how to optimize to // send to multiple ports at once @@ -380,14 +372,14 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA } } - /* + /** * Send a unicast ARP Request to the known host on a specific switch/port as * defined in the host. * The sender IP is the networkAddress of the subnet * The sender MAC is the controller's MAC */ protected void sendUcastARPRequest(HostNodeConnector host, Subnet subnet) { - + log.trace("sendUcastARPRequest host:{} subnet:{}", host, subnet); NodeConnector outPort = host.getnodeConnector(); if (outPort == null) { log.error("Failed sending UcastARP because cannot extract output port from Host: {}", host); @@ -397,22 +389,9 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA byte[] senderIP = subnet.getNetworkAddress().getAddress(); byte[] targetIP = host.getNetworkAddress().getAddress(); byte[] targetMAC = host.getDataLayerAddressBytes(); - ARP arp = new ARP(); - arp.setHardwareType(ARP.HW_TYPE_ETHERNET) - .setProtocolType(EtherTypes.IPv4.shortValue()) - .setHardwareAddressLength((byte) 6) - .setProtocolAddressLength((byte) 4) - .setOpCode(ARP.REQUEST) - .setSenderHardwareAddress(getControllerMAC()) - .setSenderProtocolAddress(senderIP) - .setTargetHardwareAddress(targetMAC) - .setTargetProtocolAddress(targetIP); + ARP arp = createARP(ARP.REQUEST, getControllerMAC(), senderIP, targetMAC, targetIP); - Ethernet ethernet = new Ethernet(); - ethernet.setSourceMACAddress(getControllerMAC()) - .setDestinationMACAddress(targetMAC) - .setEtherType(EtherTypes.ARP.shortValue()) - .setPayload(arp); + Ethernet ethernet = createEthernet(getControllerMAC(), targetMAC, arp); RawPacket destPkt = this.dataPacketService.encodeDataPacket(ethernet); destPkt.setOutgoingNodeConnector(outPort); @@ -420,6 +399,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA this.dataPacketService.transmitDataPacket(destPkt); } + @Override public void find(InetAddress networkAddress) { log.trace("Received find IP {}", networkAddress); @@ -439,6 +419,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA /* * Probe the host by sending a unicast ARP Request to the host */ + @Override public void probe(HostNodeConnector host) { log.trace("Received probe host {}", host); @@ -452,7 +433,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA return; } - if (connectionManager.isLocal(host.getnodeconnectorNode())){ + if (connectionManager.getLocalityStatus(host.getnodeconnectorNode()) == ConnectionLocality.LOCAL){ log.trace("Send a ucast ARP req. to: {}", host); sendUcastARPRequest(host, subnet); } else { @@ -461,10 +442,13 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA } } - /* + /** * An IP packet is punted to the controller, this means that the * destination host is not known to the controller. * Need to discover it by sending a Broadcast ARP Request + * + * @param pkt + * @param p */ protected void handlePuntedIPPacket(IPv4 pkt, NodeConnector p) { @@ -473,6 +457,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA return; } + // try to find a matching subnet Subnet subnet = null; if (switchManager != null) { subnet = switchManager.getSubnetByNetworkAddress(dIP); @@ -481,12 +466,43 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA log.debug("Can't find subnet matching {}, drop packet", dIP); return; } - log.trace("Punted IP pkt from {}, sending bcast ARP event...", dIP); - /* - * unknown destination host, initiate bcast ARP request - */ - arpRequestReplyEvent.put(new ARPRequest(dIP, subnet), false); - return; + + // see if we know about the host + HostNodeConnector host = hostTracker.hostFind(dIP); + + if (host == null) { + // if we don't, know about the host, try to find it + log.trace("Punted IP pkt to {}, sending bcast ARP event...", + dIP); + /* + * unknown destination host, initiate bcast ARP request + */ + arpRequestReplyEvent.put(new ARPRequest(dIP, subnet), false); + + } else if (routing == null || + routing.getRoute(p.getNode(), host.getnodeconnectorNode()) != null) { + /* if IRouting is available, make sure that this packet can get it's + * destination normally before teleporting it there. If it's not + * available, then assume it's reachable. + * + * TODO: come up with a way to do this in the absence of IRouting + */ + + log.trace("forwarding punted IP pkt to {} received at {}", dIP, p); + + /* if we know where the host is and there's a path from where this + * packet was punted to where the host is, then deliver it to the + * host for now */ + NodeConnector nc = host.getnodeConnector(); + + // re-encode the Ethernet packet (the parent of the IPv4 packet) + RawPacket rp = this.dataPacketService.encodeDataPacket(pkt.getParent()); + rp.setOutgoingNodeConnector(nc); + this.dataPacketService.transmitDataPacket(rp); + } else { + log.trace("ignoring punted IP pkt to {} because there is no route from {}", + dIP, p); + } } public byte[] getControllerMAC() { @@ -504,12 +520,13 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA void init() { arpRequestors = new ConcurrentHashMap>(); countDownTimers = new ConcurrentHashMap(); + cacheEventHandler = new Thread(new ARPCacheEventHandler(), "ARPCacheEventHandler Thread"); allocateCaches(); retrieveCaches(); } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked" }) private void retrieveCaches() { ConcurrentMap map; @@ -526,7 +543,6 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA } } - @SuppressWarnings("deprecation") private void allocateCaches() { if (clusterContainerService == null){ nonClusterObjectCreate(); @@ -536,7 +552,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA try{ clusterContainerService.createCache(ARP_EVENT_CACHE_NAME, - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); } catch (CacheConfigException e){ log.error("ARPHandler cache configuration invalid!"); } catch (CacheExistException e){ @@ -555,6 +571,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA * */ void destroy() { + cacheEventHandler.interrupt(); } /** @@ -564,7 +581,9 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA * */ void start() { + stopping = false; startPeriodicTimer(); + cacheEventHandler.start(); } /** @@ -577,6 +596,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA } void stopping() { + stopping = true; cancelPeriodicTimer(); } @@ -613,10 +633,32 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA return PacketResult.IGNORED; } + private ARP createARP(short opCode, byte[] senderMacAddress, byte[] senderIP, byte[] targetMacAddress, byte[] targetIP) { + ARP arp = new ARP(); + arp.setHardwareType(ARP.HW_TYPE_ETHERNET); + arp.setProtocolType(EtherTypes.IPv4.shortValue()); + arp.setHardwareAddressLength((byte) 6); + arp.setProtocolAddressLength((byte) 4); + arp.setOpCode(opCode); + arp.setSenderHardwareAddress(senderMacAddress) ; + arp.setSenderProtocolAddress(senderIP); + arp.setTargetHardwareAddress(targetMacAddress); + arp.setTargetProtocolAddress(targetIP); + return arp; + } + + private Ethernet createEthernet(byte[] sourceMAC, byte[] targetMAC, ARP arp) { + Ethernet ethernet = new Ethernet(); + ethernet.setSourceMACAddress(sourceMAC); + ethernet.setDestinationMACAddress(targetMAC); + ethernet.setEtherType(EtherTypes.ARP.shortValue()); + ethernet.setPayload(arp); + return ethernet; + } + private void startPeriodicTimer() { this.periodicTimer = new Timer("ArpHandler Periodic Timer"); this.periodicTimer.scheduleAtFixedRate(new TimerTask() { - @SuppressWarnings("deprecation") @Override public void run() { Set targetIPs = countDownTimers.keySet(); @@ -657,22 +699,38 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA } private void generateAndSendReply(InetAddress sourceIP, byte[] sourceMAC) { + if (log.isTraceEnabled()) { + log.trace("generateAndSendReply called with params sourceIP:{} sourceMAC:{}", sourceIP, + HexEncode.bytesToHexString(sourceMAC)); + } Set hosts = arpRequestors.remove(sourceIP); if ((hosts == null) || hosts.isEmpty()) { + log.trace("Bailing out no requestors Hosts"); return; } countDownTimers.remove(sourceIP); for (HostNodeConnector host : hosts) { - log.trace("Sending ARP Reply with src {}/{}, target {}/{}", - new Object[] { sourceMAC, sourceIP, host.getDataLayerAddressBytes(), host.getNetworkAddress() }); - - if (connectionManager.isLocal(host.getnodeconnectorNode())){ + if (log.isTraceEnabled()) { + log.trace("Sending ARP Reply with src {}/{}, target {}/{}", + new Object[] { + HexEncode.bytesToHexString(sourceMAC), + sourceIP, + HexEncode.bytesToHexString(host.getDataLayerAddressBytes()), + host.getNetworkAddress() }); + } + if (connectionManager.getLocalityStatus(host.getnodeconnectorNode()) == ConnectionLocality.LOCAL){ sendARPReply(host.getnodeConnector(), sourceMAC, sourceIP, host.getDataLayerAddressBytes(), host.getNetworkAddress()); } else { + /* + * In the remote event a requestor moved to another + * controller it may turn out it now we need to send + * the ARP reply from a different controller, this + * cover the case + */ arpRequestReplyEvent.put( new ARPReply( host.getnodeConnector(), @@ -687,31 +745,8 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA @Override public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) { - if (key instanceof ARPRequest) { - ARPRequest req = (ARPRequest) key; - // If broadcast request - if (req.getHost() == null) { - sendBcastARPRequest(req.getTargetIP(), req.getSubnet()); - - //If unicast and local, send reply - } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) { - sendUcastARPRequest(req.getHost(), req.getSubnet()); - } - } else if (key instanceof ARPReply) { - ARPReply rep = (ARPReply) key; - // New reply received by controller, notify all awaiting requestors across the cluster - if (new_value) { - generateAndSendReply(rep.getTargetIP(), rep.getTargetMac()); - - // Otherwise, a specific reply. If local, send out. - } else if (connectionManager.isLocal(rep.getPort().getNode())) { - sendARPReply(rep.getPort(), - rep.getSourceMac(), - rep.getSourceIP(), - rep.getTargetMac(), - rep.getTargetIP()); - } - } + log.trace("Got and entryUpdated for cacheName {} key {} isNew {}", cacheName, key, new_value); + enqueueARPCacheEvent(key, new_value); } @Override @@ -722,4 +757,62 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) { // nothing to do } + + private void enqueueARPCacheEvent (ARPEvent event, boolean new_value) { + try { + ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value); + if (!ARPCacheEvents.contains(cacheEvent)) { + this.ARPCacheEvents.add(cacheEvent); + log.trace("Enqueued {}", event); + } + } catch (Exception e) { + log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event); + } + } + + /* + * this thread monitors the connectionEvent queue for new incoming events from + */ + private class ARPCacheEventHandler implements Runnable { + @Override + public void run() { + while (!stopping) { + try { + ARPCacheEvent ev = ARPCacheEvents.take(); + ARPEvent event = ev.getEvent(); + if (event instanceof ARPRequest) { + ARPRequest req = (ARPRequest) event; + // If broadcast request + if (req.getHost() == null) { + log.trace("Trigger and ARP Broadcast Request upon receipt of {}", req); + sendBcastARPRequest(req.getTargetIP(), req.getSubnet()); + + //If unicast and local, send reply + } else if (connectionManager.getLocalityStatus(req.getHost().getnodeconnectorNode()) == ConnectionLocality.LOCAL) { + log.trace("ARPCacheEventHandler - sendUcatARPRequest upon receipt of {}", req); + sendUcastARPRequest(req.getHost(), req.getSubnet()); + } + } else if (event instanceof ARPReply) { + ARPReply rep = (ARPReply) event; + // New reply received by controller, notify all awaiting requestors across the cluster + if (ev.isNewReply()) { + log.trace("Trigger a generateAndSendReply in response to {}", rep); + generateAndSendReply(rep.getTargetIP(), rep.getTargetMac()); + // Otherwise, a specific reply. If local, send out. + } else if (connectionManager.getLocalityStatus(rep.getPort().getNode()) == ConnectionLocality.LOCAL) { + log.trace("ARPCacheEventHandler - sendUcatARPReply locally in response to {}", rep); + sendARPReply(rep.getPort(), + rep.getSourceMac(), + rep.getSourceIP(), + rep.getTargetMac(), + rep.getTargetIP()); + } + } + } catch (InterruptedException e) { + ARPCacheEvents.clear(); + return; + } + } + } + } }