X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fsamples%2Fsimpleforwarding%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsamples%2Fsimpleforwarding%2Finternal%2FSimpleForwardingImpl.java;h=a48d3317738fff283cf37d6eecbbb4cf96781ab4;hp=11056478dbf1742ec48e5cc6bbf9ec3bf01ad56a;hb=e13de280c24593cf8b1ddae56966d50e61f48638;hpb=ff1b4a79cca00743a00c3b0b1100bd0ab2b2fb31 diff --git a/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java b/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java index 11056478db..a48d331773 100644 --- a/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java +++ b/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.samples.simpleforwarding.internal; +import java.net.InetAddress; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -18,7 +19,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.Timer; +import java.util.TimerTask; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; @@ -45,40 +49,170 @@ import org.opendaylight.controller.sal.core.UpdateType; import org.opendaylight.controller.sal.flowprogrammer.Flow; import org.opendaylight.controller.sal.match.Match; import org.opendaylight.controller.sal.match.MatchType; +import org.opendaylight.controller.sal.packet.Ethernet; +import org.opendaylight.controller.sal.packet.IDataPacketService; +import org.opendaylight.controller.sal.packet.IListenDataPacket; +import org.opendaylight.controller.sal.packet.IPv4; +import org.opendaylight.controller.sal.packet.Packet; +import org.opendaylight.controller.sal.packet.PacketResult; +import org.opendaylight.controller.sal.packet.RawPacket; import org.opendaylight.controller.sal.routing.IListenRoutingUpdates; import org.opendaylight.controller.sal.routing.IRouting; import org.opendaylight.controller.sal.utils.EtherTypes; +import org.opendaylight.controller.sal.utils.NetUtils; import org.opendaylight.controller.sal.utils.NodeConnectorCreator; import org.opendaylight.controller.sal.utils.Status; +import org.opendaylight.controller.samples.simpleforwarding.HostNodePair; import org.opendaylight.controller.switchmanager.IInventoryListener; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * This class implements basic L3 forwarding within the managed devices. + * Forwarding is only done within configured subnets.
+ *
+ * The basic flow is that the module listens for new hosts from the + * {@link org.opendaylight.controller.hosttracker.IfIptoHost HostTracker} + * service and on discovering a new host it first calls + * preparePerHostRules() to create a set of new rules that must be + * installed in the network. This is done by repeatedly calling + * updatePerHostRuleInSW() for each switch in the network. Then it + * installs those rules using installPerHostRules(). + */ public class SimpleForwardingImpl implements IfNewHostNotify, - IListenRoutingUpdates, IInventoryListener { - private static Logger log = LoggerFactory - .getLogger(SimpleForwardingImpl.class); + IListenRoutingUpdates, IInventoryListener, IListenDataPacket { + private static Logger log = LoggerFactory.getLogger(SimpleForwardingImpl.class); private static short DEFAULT_IPSWITCH_PRIORITY = 1; + static final String FORWARDING_RULES_CACHE_NAME = "forwarding.ipswitch.rules"; private IfIptoHost hostTracker; private IForwardingRulesManager frm; private ITopologyManager topologyManager; private IRouting routing; + + /** + * The set of all forwarding rules: (host) -> (switch -> flowmod). Note that + * the host includes an attachment point and that while the switch appears + * to be a switch's port, in actuality it is a special port which just + * represents the switch. + */ private ConcurrentMap> rulesDB; private Map> tobePrunedPos = new HashMap>(); private IClusterContainerServices clusterContainerService = null; private ISwitchManager switchManager; + private IDataPacketService dataPacketService; + + /** + * Ip packets that are punted may not have their destination known by hostTracker at the time it + * is presented to SimpleForwardingImpl. Instead of dropping the packet, we will keep it around + * for a 'little' while, to accommodate any transients. See bug 590 for more details. + */ + private class PendingPacketData { + private final static byte MAX_AGE = 2; + + public final IPv4 pkt; + public final NodeConnector incomingNodeConnector; + private byte age; + + public PendingPacketData(IPv4 pkt, NodeConnector incomingNodeConnector) { + this.pkt = pkt; + this.incomingNodeConnector = incomingNodeConnector; + this.age = 0; + } + boolean bumpAgeAndCheckIfTooOld() { return ++age > MAX_AGE; } + } + private static final int MAX_PENDING_PACKET_DESTINATIONS = 64; + private ConcurrentMap pendingPacketDestinations; + private Timer pendingPacketsAgerTimer; + + private class PendingPacketsAgerTimerHandler extends TimerTask { + @Override + public void run() { + if (pendingPacketDestinations == null) { + return; + } + try { + Iterator> iterator = + pendingPacketDestinations.entrySet().iterator(); + while (iterator.hasNext()) { + ConcurrentHashMap.Entry entry = iterator.next(); + InetAddress dIP = entry.getKey(); + PendingPacketData pendingPacketData = entry.getValue(); + + if (pendingPacketData.bumpAgeAndCheckIfTooOld()) { + iterator.remove(); // safe to remove while iterating... + log.debug("Pending packet for {} has been aged out", dIP); + } else { + /** Replace the entry for a key only if currently mapped to some value. + * This will protect the concurrent map against a race where this thread + * would be re-adding an entry that just got taken out. + */ + pendingPacketDestinations.replace(dIP, pendingPacketData); + } + } + } catch (IllegalStateException e) { + log.warn("IllegalStateException Received by PendingPacketsAgerTimerHandler from: {}", + e.getMessage()); + } + } + } + + /** + * Add punted packet to pendingPackets + */ + private void addToPendingPackets(InetAddress dIP, IPv4 pkt, NodeConnector incomingNodeConnector) { + if (pendingPacketDestinations.size() >= MAX_PENDING_PACKET_DESTINATIONS) { + log.info("Will not pend packet for {}: Too many destinations", dIP); + return; + } + + /** TODO: The current implementation allows for up to 1 pending packet per InetAddress. + * This limitation is done for sake of simplicity. A potential enhancement could be to use a + * ConcurrentMultiMap instead of ConcurrentMap. + */ + if (pendingPacketDestinations.containsKey(dIP)) { + log.trace("Will not pend packet for {}: Already have a packet pending", dIP); + return; + } + + PendingPacketData pendingPacketData = new PendingPacketData(pkt, incomingNodeConnector); + pendingPacketDestinations.put(dIP, pendingPacketData); + log.debug("Pending packet for {}", dIP); + } + + /** + * Send punted packet to given destination. This is invoked when there is a certain level of + * hope that the destination is known by hostTracker. + */ + private void sendPendingPacket(InetAddress dIP) { + PendingPacketData pendingPacketData = pendingPacketDestinations.get(dIP); + if (pendingPacketData != null) { + handlePuntedIPPacket(pendingPacketData.pkt, pendingPacketData.incomingNodeConnector, false); + log.trace("Packet for {} is no longer pending", dIP); + pendingPacketDestinations.remove(dIP); + } + } /** * Return codes from the programming of the perHost rules in HW - * */ public enum RulesProgrammingReturnCode { SUCCESS, FAILED_FEW_SWITCHES, FAILED_ALL_SWITCHES, FAILED_WRONG_PARAMS } + public void setDataPacketService(IDataPacketService s) { + log.debug("Setting dataPacketService"); + this.dataPacketService = s; + } + + public void unsetDataPacketService(IDataPacketService s) { + if (this.dataPacketService == s) { + this.dataPacketService = null; + } + } public void setRouting(IRouting routing) { + log.debug("Setting routing"); this.routing = routing; } @@ -88,10 +222,6 @@ public class SimpleForwardingImpl implements IfNewHostNotify, } } - public ITopologyManager getTopologyManager() { - return topologyManager; - } - public void setTopologyManager(ITopologyManager topologyManager) { log.debug("Setting topologyManager"); this.topologyManager = topologyManager; @@ -134,6 +264,15 @@ public class SimpleForwardingImpl implements IfNewHostNotify, public void startUp() { allocateCaches(); retrieveCaches(); + nonClusterObjectCreate(); + } + + public void nonClusterObjectCreate() { + pendingPacketDestinations = new ConcurrentHashMap(); + + /* Pending Packets Ager Timer to go off every 6 seconds to implement pending packet aging */ + pendingPacketsAgerTimer = new Timer(); + pendingPacketsAgerTimer.schedule(new PendingPacketsAgerTimerHandler(), 6000, 6000); } /** @@ -146,16 +285,15 @@ public class SimpleForwardingImpl implements IfNewHostNotify, destroyCaches(); } - @SuppressWarnings("deprecation") - private void allocateCaches() { + private void allocateCaches() { if (this.clusterContainerService == null) { - log.info("un-initialized clusterContainerService, can't create cache"); + log.trace("un-initialized clusterContainerService, can't create cache"); return; } try { - clusterContainerService.createCache("forwarding.ipswitch.rules", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + clusterContainerService.createCache(FORWARDING_RULES_CACHE_NAME, + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); } catch (CacheExistException cee) { log.error("\nCache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { @@ -163,111 +301,122 @@ public class SimpleForwardingImpl implements IfNewHostNotify, } } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked" }) private void retrieveCaches() { if (this.clusterContainerService == null) { - log.info("un-initialized clusterContainerService, can't retrieve cache"); + log.trace("un-initialized clusterContainerService, can't retrieve cache"); return; } rulesDB = (ConcurrentMap>) clusterContainerService - .getCache("forwarding.ipswitch.rules"); + .getCache(FORWARDING_RULES_CACHE_NAME); if (rulesDB == null) { log.error("\nFailed to get rulesDB handle"); } } - @SuppressWarnings("deprecation") - private void destroyCaches() { + private void destroyCaches() { if (this.clusterContainerService == null) { - log.info("un-initialized clusterContainerService, can't destroy cache"); + log.trace("un-initialized clusterContainerService, can't destroy cache"); return; } - clusterContainerService.destroyCache("forwarding.ipswitch.rules"); + clusterContainerService.destroyCache(FORWARDING_RULES_CACHE_NAME); } - @SuppressWarnings("unused") + /** + * Populates rulesDB with rules specifying how to reach + * host from currNode assuming that: + *
    + *
  • host is attached to rootNode + *
  • link is the next part of the path to reach rootNode + * from currNode + *
  • rulesDB.get(key) represents the list of rules stored about + * host at currNode + *
+ * + * @param host + * The host to be reached. + * @param currNode + * The current node being processed. + * @param rootNode + * The node to be reached. Really, the switch which host is + * attached to. + * @param link + * The link to follow from curNode to get to rootNode + * @param key + * The key to store computed rules at in the rulesDB. For now, + * this is a {@link HostNodePair} of host and currNode. + */ private void updatePerHostRuleInSW(HostNodeConnector host, Node currNode, - Node rootNode, Edge link, HostNodePair key, - Set passedPorts) { + Node rootNode, Edge link, HostNodePair key) { - // link parameter it's optional + // only the link parameter is optional if (host == null || key == null || currNode == null || rootNode == null) { return; } - Set ports = passedPorts; - // TODO: Replace this with SAL equivalent when available - //if (container == null) { - ports = new HashSet(); + + Set ports = new HashSet(); + // add a special port of type ALL and port 0 to represent the node + // without specifying a port on that node ports.add(NodeConnectorCreator.createNodeConnector( NodeConnectorIDType.ALL, NodeConnector.SPECIALNODECONNECTORID, currNode)); - //} HashMap pos = this.rulesDB.get(key); if (pos == null) { pos = new HashMap(); } - if (ports == null) { - log.debug("Empty port list, nothing to do"); - return; - } + for (NodeConnector inPort : ports) { - /* - * skip the port connected to the target host - */ + // skip the port connected to the target host if (currNode.equals(rootNode) && (host.getnodeConnector().equals(inPort))) { continue; } + + // remove the current rule, if any FlowEntry removed_po = pos.remove(inPort); Match match = new Match(); List actions = new ArrayList(); - // IP destination based forwarding - //on /32 entries only! + + // IP destination based forwarding on /32 entries only! match.setField(MatchType.DL_TYPE, EtherTypes.IPv4.shortValue()); match.setField(MatchType.NW_DST, host.getNetworkAddress()); - //Action for the policy if to - //forward to a port except on the - //switch where the host sits, - //which is to rewrite also the MAC - //and to forward on the Host port + /* Action for the policy is to forward to a port except on the + * switch where the host sits, which is to rewrite also the MAC + * and to forward on the Host port */ NodeConnector outPort = null; if (currNode.equals(rootNode)) { + /* If we're at the root node, then rewrite the DL addr and + * possibly pop the VLAN tag. This allows for MAC rewriting + * in the core of the network assuming we can uniquely ID + * packets based on IP address. */ + outPort = host.getnodeConnector(); if (inPort.equals(outPort)) { - /* - * skip the host port - */ + // TODO: isn't this code skipped already by the above continue? + // skip the host port continue; } actions.add(new SetDlDst(host.getDataLayerAddressBytes())); - if (!inPort.getType().equals( - NodeConnectorIDType.ALL)) { - /* - * Container mode: at the destination switch, we need to strip out the tag (VLAN) - */ + if (!inPort.getType().equals(NodeConnectorIDType.ALL)) { + // Container mode: at the destination switch, we need to strip out the tag (VLAN) actions.add(new PopVlan()); } } else { - /* - * currNode is NOT the rootNode - */ + // currNode is NOT the rootNode, find the next hop and create a rule if (link != null) { outPort = link.getTailNodeConnector(); if (inPort.equals(outPort)) { - /* - * skip the outgoing port - */ + // skip the outgoing port continue; } - /* - * If outPort is network link, add VLAN tag - */ + + // If outPort is network link, add VLAN tag if (topologyManager.isInternal(outPort)) { log.debug("outPort {}/{} is internal uplink port", currNode, outPort); @@ -276,11 +425,10 @@ public class SimpleForwardingImpl implements IfNewHostNotify, currNode, outPort); } - if ((!inPort.getType().equals( - NodeConnectorIDType.ALL)) - && (topologyManager.isInternal(outPort))) { + if ((!inPort.getType().equals(NodeConnectorIDType.ALL)) + && (topologyManager.isInternal(outPort))) { Node nextNode = link.getHeadNodeConnector() - .getNode(); + .getNode(); // TODO: Replace this with SAL equivalent //short tag = container.getTag((Long)nextNode.getNodeID()); short tag = 0; @@ -300,9 +448,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify, actions.add(new Output(outPort)); } if (!inPort.getType().equals(NodeConnectorIDType.ALL)) { - /* - * include input port in the flow match field - */ + // include input port in the flow match field match.setField(MatchType.IN_PORT, inPort); if (topologyManager.isInternal(inPort)) { @@ -312,9 +458,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify, log.debug("inPort {}/{} is host facing port", currNode, inPort); } - /* - * for incoming network link; if the VLAN tag is defined, include it for incoming flow matching - */ + + // for incoming network link; if the VLAN tag is defined, include it for incoming flow matching if (topologyManager.isInternal(inPort)) { // TODO: Replace this with SAL equivalent //short tag = container.getTag((Long)currNode.getNodeID()); @@ -346,9 +491,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify, + currNode + "]"; FlowEntry po = new FlowEntry(policyName, flowName, flow, currNode); - // Now save the rule in the DB rule, - // so on updates from topology we can - // selectively + /* Now save the rule in the DB rule, so on updates from topology we + * can selectively */ pos.put(inPort, po); this.rulesDB.put(key, pos); if (!inPort.getType().equals(NodeConnectorIDType.ALL)) { @@ -392,6 +536,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify, if (host == null) { return null; } + + //TODO: race condition! unset* functions can make these null. if (this.routing == null) { return null; } @@ -409,6 +555,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify, HashMap pos; FlowEntry po; + // for all nodes in the system for (Node node : nodes) { if (node.equals(rootNode)) { // We skip it because for the node with host attached @@ -419,8 +566,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify, List links; Path res = this.routing.getRoute(node, rootNode); if ((res == null) || ((links = res.getEdges()) == null)) { - // Still the path that connect node to rootNode - // doesn't exists + // No route from node to rootNode can be found, back out any + // existing forwarding rules if they exist. log.debug("NO Route/Path between SW[{}] --> SW[{}] cleaning " + "potentially existing entries", node, rootNode); key = new HostNodePair(host, node); @@ -429,7 +576,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify, for (Map.Entry e : pos.entrySet()) { po = e.getValue(); if (po != null) { - //Uninstall the policy + // uninstall any existing rules we put in the + // ForwardingRulesManager this.frm.uninstallFlowEntry(po); } } @@ -439,31 +587,27 @@ public class SimpleForwardingImpl implements IfNewHostNotify, } log.debug("Route between SW[{}] --> SW[{}]", node, rootNode); - Integer curr; Node currNode = node; key = new HostNodePair(host, currNode); - Edge link; - for (curr = 0; curr < links.size(); curr++) { - link = links.get(curr); + + // for each link in the route from here to there + for (Edge link : links) { if (link == null) { log.error("Could not retrieve the Link"); + // TODO: should we keep going? continue; } log.debug(link.toString()); // Index all the switches to be programmed - // switchesToProgram.add(currNode); - Set ports = null; - ports = switchManager.getUpNodeConnectors(currNode); - updatePerHostRuleInSW(host, currNode, rootNode, link, key, - ports); + updatePerHostRuleInSW(host, currNode, rootNode, link, key); if ((this.rulesDB.get(key)) != null) { - /* - * Calling updatePerHostRuleInSW() doesn't guarantee that rules will be - * added in currNode (e.g, there is only one link from currNode to rootNode - * This check makes sure that there are some rules in the rulesDB for the - * given key prior to adding switch to switchesToProgram + /* Calling updatePerHostRuleInSW() doesn't guarantee that + * rules will be added in currNode (e.g, there is only one + * link from currNode to rootNode This check makes sure that + * there are some rules in the rulesDB for the given key + * prior to adding switch to switchesToProgram */ switchesToProgram.add(currNode); } @@ -477,10 +621,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify, // multiple hosts attached to it but not yet connected to the // rest of the world switchesToProgram.add(rootNode); - Set ports = switchManager - .getUpNodeConnectors(rootNode); - updatePerHostRuleInSW(host, rootNode, rootNode, null, new HostNodePair( - host, rootNode), ports); + updatePerHostRuleInSW(host, rootNode, rootNode, null, + new HostNodePair(host, rootNode)); // log.debug("Getting out at the end!"); return switchesToProgram; @@ -488,7 +630,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify, /** * Calculate the per-Host rules to be installed in the rulesDB - * from a specific switch when a host facing port comes up. + * from a specific switch when a host facing port comes up. * These rules will later on be installed in HW. This routine * will implicitly calculate the shortest path from the switch * where the port has come up to the switch where host is , @@ -521,14 +663,11 @@ public class SimpleForwardingImpl implements IfNewHostNotify, HostNodePair key; Map pos; FlowEntry po; - Set ports = new HashSet(); - ports.add(swport); List links; Path res = this.routing.getRoute(node, rootNode); if ((res == null) || ((links = res.getEdges()) == null)) { - // Still the path that connect node to rootNode - // doesn't exists + // the routing service doesn't know how to get there from here log.debug("NO Route/Path between SW[{}] --> SW[{}] cleaning " + "potentially existing entries", node, rootNode); key = new HostNodePair(host, node); @@ -565,7 +704,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify, // Index all the switches to be programmed switchesToProgram.add(currNode); - updatePerHostRuleInSW(host, currNode, rootNode, link, key, ports); + updatePerHostRuleInSW(host, currNode, rootNode, link, key); break; // come out of the loop for port up case, interested only in programming one switch } @@ -614,7 +753,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify, po = e.getValue(); if (po != null) { // Populate the Policy field now - Status poStatus = this.frm.installFlowEntry(po); + Status poStatus = this.frm.modifyOrAddFlowEntry(po); if (!poStatus.isSuccess()) { log.error("Failed to install policy: " + po.getGroupName() + " (" @@ -650,8 +789,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify, * * @return a return code that convey the programming status of the HW */ - private RulesProgrammingReturnCode uninstallPerHostRules( - HostNodeConnector host) { + private RulesProgrammingReturnCode uninstallPerHostRules(HostNodeConnector host) { RulesProgrammingReturnCode retCode = RulesProgrammingReturnCode.SUCCESS; Map pos; FlowEntry po; @@ -717,6 +855,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify, public void recalculateDone() { if (this.hostTracker == null) { //Not yet ready to process all the updates + //TODO: we should make sure that this call is executed eventually return; } Set allHosts = this.hostTracker.getAllHosts(); @@ -747,16 +886,12 @@ public class SimpleForwardingImpl implements IfNewHostNotify, for (Node swId : switches) { List pl = tobePrunedPos.get(swId); if (pl != null) { - log - .debug( - "Policies for Switch: {} in the list to be deleted: {}", - swId, pl); + log.debug("Policies for Switch: {} in the list to be deleted: {}", swId, pl); Iterator plIter = pl.iterator(); //for (Policy po: pl) { while (plIter.hasNext()) { FlowEntry po = plIter.next(); - log.error("Removing Policy, Switch: {} Policy: {}", swId, - po); + log.error("Removing Policy, Switch: {} Policy: {}", swId, po); this.frm.uninstallFlowEntry(po); plIter.remove(); } @@ -765,7 +900,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify, } } - /* + /** * A Host facing port has come up in a container. Add rules on the switch where this * port has come up for all the known hosts to the controller. * @param swId switch id of the port where port came up @@ -779,8 +914,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify, log.debug("Host Facing Port in a container came up, install the rules for all hosts from this port !"); Set allHosts = this.hostTracker.getAllHosts(); for (HostNodeConnector host : allHosts) { - if (node.equals(host.getnodeconnectorNode()) - && swPort.equals(host.getnodeConnector())) { + if (node.equals(host.getnodeconnectorNode())) { /* * This host resides behind the same switch and port for which a port up * message is received. Ideally this should not happen, but if it does, @@ -807,6 +941,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify, Set switches = preparePerHostRules(host); if (switches != null) { installPerHostRules(host, switches); + + // Green light for sending pending packet to this host. Safe to call if there are none. + sendPendingPacket(host.getNetworkAddress()); } } @@ -821,8 +958,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify, @Override public void notifyNode(Node node, UpdateType type, Map propMap) { - if (node == null) + if (node == null) { return; + } switch (type) { case REMOVED: @@ -837,8 +975,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify, @Override public void notifyNodeConnector(NodeConnector nodeConnector, UpdateType type, Map propMap) { - if (nodeConnector == null) + if (nodeConnector == null) { return; + } boolean up = false; switch (type) { @@ -929,6 +1068,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify, * */ void stop() { + pendingPacketsAgerTimer.cancel(); + pendingPacketDestinations.clear(); } public void setSwitchManager(ISwitchManager switchManager) { @@ -940,4 +1081,65 @@ public class SimpleForwardingImpl implements IfNewHostNotify, this.switchManager = null; } } + + @Override + public PacketResult receiveDataPacket(RawPacket inPkt) { + if (inPkt == null) { + return PacketResult.IGNORED; + } + log.trace("Received a frame of size: {}", inPkt.getPacketData().length); + Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt); + if (formattedPak instanceof Ethernet) { + Object nextPak = formattedPak.getPayload(); + if (nextPak instanceof IPv4) { + log.trace("Handle punted IP packet: {}", formattedPak); + handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector(), true); + } + } + return PacketResult.IGNORED; + + } + + private void handlePuntedIPPacket(IPv4 pkt, NodeConnector incomingNodeConnector, boolean allowAddPending) { + InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress()); + if (dIP == null || hostTracker == null) { + log.debug("Invalid param(s) in handlePuntedIPPacket.. DestIP: {}. hostTracker: {}", dIP, hostTracker); + return; + } + HostNodeConnector destHost = hostTracker.hostFind(dIP); + /* + * In cases when incoming and outgoing connectors are in the same node, there is no need + * to verify that there is a route. Because of that, we will only need routing.getRoute() + * if we know that src and dst nodes are different. + */ + if (destHost != null + && (incomingNodeConnector.getNode().equals(destHost.getnodeconnectorNode()) || + routing == null || + routing.getRoute(incomingNodeConnector.getNode(), destHost.getnodeconnectorNode()) != null)) { + + log.trace("Host {} is at {}", dIP, destHost.getnodeConnector()); + + // If SimpleForwarding is aware of this host, it will try to install + // a path. Forward packet until it's done. + if (dataPacketService != null) { + + /* + * if we know where the host is and there's a path from where this + * packet was punted to where the host is, then attempt best effort delivery to the host + */ + NodeConnector nc = destHost.getnodeConnector(); + log.trace("Forwarding punted IP received at {} to {}", incomingNodeConnector, nc); + // re-encode the Ethernet packet (the parent of the IPv4 packet) + RawPacket rp = this.dataPacketService.encodeDataPacket(pkt.getParent()); + rp.setOutgoingNodeConnector(nc); + this.dataPacketService.transmitDataPacket(rp); + } + } else if (allowAddPending) { + // If we made it here, let's hang on to the punted packet, with hopes that its destination + // will become available soon. + addToPendingPackets(dIP, pkt, incomingNodeConnector); + } else { + log.warn("Dropping punted IP packet received at {} to Host {}", incomingNodeConnector, dIP); + } + } }