Bug-590: Packet loss on first time ping test
[controller.git] / opendaylight / samples / simpleforwarding / src / main / java / org / opendaylight / controller / samples / simpleforwarding / internal / SimpleForwardingImpl.java
index 4b3363b832d72ebbd12215d7236bba43009f4af3..d6957385bd8278d566ba31fc0cf795539d1161c8 100644 (file)
@@ -9,6 +9,7 @@
 
 package org.opendaylight.controller.samples.simpleforwarding.internal;
 
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -18,7 +19,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
@@ -45,9 +49,17 @@ import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.flowprogrammer.Flow;
 import org.opendaylight.controller.sal.match.Match;
 import org.opendaylight.controller.sal.match.MatchType;
+import org.opendaylight.controller.sal.packet.Ethernet;
+import org.opendaylight.controller.sal.packet.IDataPacketService;
+import org.opendaylight.controller.sal.packet.IListenDataPacket;
+import org.opendaylight.controller.sal.packet.IPv4;
+import org.opendaylight.controller.sal.packet.Packet;
+import org.opendaylight.controller.sal.packet.PacketResult;
+import org.opendaylight.controller.sal.packet.RawPacket;
 import org.opendaylight.controller.sal.routing.IListenRoutingUpdates;
 import org.opendaylight.controller.sal.routing.IRouting;
 import org.opendaylight.controller.sal.utils.EtherTypes;
+import org.opendaylight.controller.sal.utils.NetUtils;
 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.samples.simpleforwarding.HostNodePair;
@@ -70,11 +82,10 @@ import org.slf4j.LoggerFactory;
  * installs those rules using <tt>installPerHostRules()</tt>.
  */
 public class SimpleForwardingImpl implements IfNewHostNotify,
-        IListenRoutingUpdates, IInventoryListener {
-    private static Logger log = LoggerFactory
-            .getLogger(SimpleForwardingImpl.class);
+        IListenRoutingUpdates, IInventoryListener, IListenDataPacket {
+    private static Logger log = LoggerFactory.getLogger(SimpleForwardingImpl.class);
     private static short DEFAULT_IPSWITCH_PRIORITY = 1;
-    private static String FORWARDING_RULES_CACHE_NAME = "forwarding.ipswitch.rules";
+    static final String FORWARDING_RULES_CACHE_NAME = "forwarding.ipswitch.rules";
     private IfIptoHost hostTracker;
     private IForwardingRulesManager frm;
     private ITopologyManager topologyManager;
@@ -90,6 +101,99 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
     private Map<Node, List<FlowEntry>> tobePrunedPos = new HashMap<Node, List<FlowEntry>>();
     private IClusterContainerServices clusterContainerService = null;
     private ISwitchManager switchManager;
+    private IDataPacketService dataPacketService;
+
+    /**
+     * Ip packets that are punted may not have their destination known by hostTracker at the time it
+     * is presented to SimpleForwardingImpl. Instead of dropping the packet, we will keep it around
+     * for a 'little' while, to accommodate any transients. See bug 590 for more details.
+     */
+    private class PendingPacketData {
+        private final static byte MAX_AGE = 2;
+
+        public final IPv4 pkt;
+        public final NodeConnector incomingNodeConnector;
+        private byte age;
+
+        public PendingPacketData(IPv4 pkt, NodeConnector incomingNodeConnector) {
+            this.pkt = pkt;
+            this.incomingNodeConnector = incomingNodeConnector;
+            this.age = 0;
+        }
+        boolean bumpAgeAndCheckIfTooOld() { return ++age > MAX_AGE; }
+    }
+    private static final int MAX_PENDING_PACKET_DESTINATIONS = 64;
+    private ConcurrentMap<InetAddress, PendingPacketData> pendingPacketDestinations;
+    private Timer pendingPacketsAgerTimer;
+
+    private class PendingPacketsAgerTimerHandler extends TimerTask {
+        @Override
+        public void run() {
+            if (pendingPacketDestinations == null) {
+                return;
+            }
+            try {
+                Iterator<ConcurrentMap.Entry<InetAddress, PendingPacketData>> iterator =
+                        pendingPacketDestinations.entrySet().iterator();
+                while (iterator.hasNext()) {
+                    ConcurrentHashMap.Entry<InetAddress, PendingPacketData> entry = iterator.next();
+                    InetAddress dIP = entry.getKey();
+                    PendingPacketData pendingPacketData = entry.getValue();
+
+                    if (pendingPacketData.bumpAgeAndCheckIfTooOld()) {
+                        iterator.remove(); // safe to remove while iterating...
+                        log.debug("Pending packet for {} has been aged out", dIP);
+                    } else {
+                        /** Replace the entry for a key only if currently mapped to some value.
+                         * This will protect the concurrent map against a race where this thread
+                         * would be re-adding an entry that just got taken out.
+                         */
+                        pendingPacketDestinations.replace(dIP, pendingPacketData);
+                    }
+                }
+            } catch (IllegalStateException e) {
+                log.debug("IllegalStateException Received by PendingPacketsAgerTimerHandler from: {}",
+                        e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Add punted packet to pendingPackets
+     */
+    private void addToPendingPackets(InetAddress dIP, IPv4 pkt, NodeConnector incomingNodeConnector) {
+        if (pendingPacketDestinations.size() >= MAX_PENDING_PACKET_DESTINATIONS) {
+            log.info("Will not pend packet for {}: Too many destinations", dIP);
+            return;
+        }
+
+        /** TODO: The current implementation allows for up to 1 pending packet per InetAddress.
+         * This limitation is done for sake of simplicity. A potential enhancement could be to use a
+         * ConcurrentMultiMap instead of ConcurrentMap.
+         */
+        if (pendingPacketDestinations.containsKey(dIP)) {
+            log.trace("Will not pend packet for {}: Already have a packet pending", dIP);
+            return;
+        }
+
+        PendingPacketData pendingPacketData = new PendingPacketData(pkt, incomingNodeConnector);
+        pendingPacketDestinations.put(dIP, pendingPacketData);
+        log.debug("Pending packet for {}", dIP);
+    }
+
+    /**
+     * Send punted packet to given destination. This is invoked when there is a certain level of
+     * hope that the destination is known by hostTracker.
+     */
+    private void sendPendingPacket(InetAddress dIP) {
+        pendingPacketDestinations.get(dIP);
+        PendingPacketData pendingPacketData = pendingPacketDestinations.get(dIP);
+        if (pendingPacketData != null) {
+            handlePuntedIPPacket(pendingPacketData.pkt, pendingPacketData.incomingNodeConnector, false);
+            log.trace("Packet for {} is no longer pending", dIP);
+            pendingPacketDestinations.remove(dIP);
+        }
+    }
 
     /**
      * Return codes from the programming of the perHost rules in HW
@@ -97,8 +201,19 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
     public enum RulesProgrammingReturnCode {
         SUCCESS, FAILED_FEW_SWITCHES, FAILED_ALL_SWITCHES, FAILED_WRONG_PARAMS
     }
+    public void setDataPacketService(IDataPacketService s) {
+        log.debug("Setting dataPacketService");
+        this.dataPacketService = s;
+    }
+
+    public void unsetDataPacketService(IDataPacketService s) {
+        if (this.dataPacketService == s) {
+            this.dataPacketService = null;
+        }
+    }
 
     public void setRouting(IRouting routing) {
+        log.debug("Setting routing");
         this.routing = routing;
     }
 
@@ -108,10 +223,6 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
         }
     }
 
-    public ITopologyManager getTopologyManager() {
-        return topologyManager;
-    }
-
     public void setTopologyManager(ITopologyManager topologyManager) {
         log.debug("Setting topologyManager");
         this.topologyManager = topologyManager;
@@ -154,6 +265,15 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
     public void startUp() {
         allocateCaches();
         retrieveCaches();
+        nonClusterObjectCreate();
+    }
+
+    public void nonClusterObjectCreate() {
+        pendingPacketDestinations = new ConcurrentHashMap<InetAddress, PendingPacketData>();
+
+        /* Pending Packets Ager Timer to go off every 6 seconds to implement pending packet aging */
+        pendingPacketsAgerTimer = new Timer();
+        pendingPacketsAgerTimer.schedule(new PendingPacketsAgerTimerHandler(), 6000, 6000);
     }
 
     /**
@@ -670,8 +790,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
      *
      * @return a return code that convey the programming status of the HW
      */
-    private RulesProgrammingReturnCode uninstallPerHostRules(
-            HostNodeConnector host) {
+    private RulesProgrammingReturnCode uninstallPerHostRules(HostNodeConnector host) {
         RulesProgrammingReturnCode retCode = RulesProgrammingReturnCode.SUCCESS;
         Map<NodeConnector, FlowEntry> pos;
         FlowEntry po;
@@ -768,16 +887,12 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
         for (Node swId : switches) {
             List<FlowEntry> pl = tobePrunedPos.get(swId);
             if (pl != null) {
-                log
-                        .debug(
-                                "Policies for Switch: {} in the list to be deleted: {}",
-                                swId, pl);
+                log.debug("Policies for Switch: {} in the list to be deleted: {}", swId, pl);
                 Iterator<FlowEntry> plIter = pl.iterator();
                 //for (Policy po: pl) {
                 while (plIter.hasNext()) {
                     FlowEntry po = plIter.next();
-                    log.error("Removing Policy, Switch: {} Policy: {}", swId,
-                            po);
+                    log.error("Removing Policy, Switch: {} Policy: {}", swId, po);
                     this.frm.uninstallFlowEntry(po);
                     plIter.remove();
                 }
@@ -827,6 +942,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
         Set<Node> switches = preparePerHostRules(host);
         if (switches != null) {
             installPerHostRules(host, switches);
+
+            // Green light for sending pending packet to this host. Safe to call if there are none.
+            sendPendingPacket(host.getNetworkAddress());
         }
     }
 
@@ -951,6 +1069,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
      *
      */
     void stop() {
+        pendingPacketsAgerTimer.cancel();
+        pendingPacketDestinations.clear();
     }
 
     public void setSwitchManager(ISwitchManager switchManager) {
@@ -962,4 +1082,65 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
             this.switchManager = null;
         }
     }
+
+    @Override
+    public PacketResult receiveDataPacket(RawPacket inPkt) {
+        if (inPkt == null) {
+            return PacketResult.IGNORED;
+        }
+        log.trace("Received a frame of size: {}", inPkt.getPacketData().length);
+        Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
+        if (formattedPak instanceof Ethernet) {
+            Object nextPak = formattedPak.getPayload();
+            if (nextPak instanceof IPv4) {
+                log.trace("Handle punted IP packet: {}", formattedPak);
+                handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector(), true);
+            }
+        }
+        return PacketResult.IGNORED;
+
+    }
+
+    private void handlePuntedIPPacket(IPv4 pkt, NodeConnector incomingNodeConnector, boolean allowAddPending) {
+        InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress());
+        if (dIP == null || hostTracker == null) {
+            log.debug("Invalid param(s) in handlePuntedIPPacket.. DestIP: {}. hostTracker: {}", dIP, hostTracker);
+            return;
+        }
+        HostNodeConnector destHost = hostTracker.hostFind(dIP);
+        /*
+         * In cases when incoming and outgoing connectors are in the same node, there is no need
+         * to verify that there is a route. Because of that, we will only need routing.getRoute()
+         * if we know that src and dst nodes are different.
+         */
+        if (destHost != null
+                && (incomingNodeConnector.getNode().equals(destHost.getnodeconnectorNode()) ||
+                    routing == null ||
+                    routing.getRoute(incomingNodeConnector.getNode(), destHost.getnodeconnectorNode()) != null)) {
+
+            log.trace("Host {} is at {}", dIP, destHost.getnodeConnector());
+
+            // If SimpleForwarding is aware of this host, it will try to install
+            // a path. Forward packet until it's done.
+            if (dataPacketService != null) {
+
+                /*
+                 * if we know where the host is and there's a path from where this
+                 * packet was punted to where the host is, then attempt best effort delivery to the host
+                 */
+                NodeConnector nc = destHost.getnodeConnector();
+                log.trace("Forwarding punted IP received at {} to {}", incomingNodeConnector, nc);
+                // re-encode the Ethernet packet (the parent of the IPv4 packet)
+                RawPacket rp = this.dataPacketService.encodeDataPacket(pkt.getParent());
+                rp.setOutgoingNodeConnector(nc);
+                this.dataPacketService.transmitDataPacket(rp);
+            }
+        } else if (allowAddPending) {
+            // If we made it here, let's hang on to the punted packet, with hopes that its destination
+            // will become available soon.
+            addToPendingPackets(dIP, pkt, incomingNodeConnector);
+        } else {
+            log.warn("Dropping punted IP packet received at {} to Host {}", incomingNodeConnector, dIP);
+        }
+    }
 }