X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fsamples%2Fsimpleforwarding%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsamples%2Fsimpleforwarding%2Finternal%2FSimpleForwardingImpl.java;h=d6957385bd8278d566ba31fc0cf795539d1161c8;hb=d3cb37167e6327b4f280e44bdf92ea95fb240fb8;hp=043e5c3f0db1768449a5d580af2172f9aa828e12;hpb=b45957ba3b729ca2c406c893d3db24f5f6363df1;p=controller.git
diff --git a/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java b/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java
index 043e5c3f0d..d6957385bd 100644
--- a/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java
+++ b/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java
@@ -9,6 +9,7 @@
package org.opendaylight.controller.samples.simpleforwarding.internal;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@@ -18,7 +19,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.Timer;
+import java.util.TimerTask;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
@@ -45,9 +49,17 @@ import org.opendaylight.controller.sal.core.UpdateType;
import org.opendaylight.controller.sal.flowprogrammer.Flow;
import org.opendaylight.controller.sal.match.Match;
import org.opendaylight.controller.sal.match.MatchType;
+import org.opendaylight.controller.sal.packet.Ethernet;
+import org.opendaylight.controller.sal.packet.IDataPacketService;
+import org.opendaylight.controller.sal.packet.IListenDataPacket;
+import org.opendaylight.controller.sal.packet.IPv4;
+import org.opendaylight.controller.sal.packet.Packet;
+import org.opendaylight.controller.sal.packet.PacketResult;
+import org.opendaylight.controller.sal.packet.RawPacket;
import org.opendaylight.controller.sal.routing.IListenRoutingUpdates;
import org.opendaylight.controller.sal.routing.IRouting;
import org.opendaylight.controller.sal.utils.EtherTypes;
+import org.opendaylight.controller.sal.utils.NetUtils;
import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.samples.simpleforwarding.HostNodePair;
@@ -70,11 +82,10 @@ import org.slf4j.LoggerFactory;
* installs those rules using installPerHostRules().
*/
public class SimpleForwardingImpl implements IfNewHostNotify,
- IListenRoutingUpdates, IInventoryListener {
- private static Logger log = LoggerFactory
- .getLogger(SimpleForwardingImpl.class);
+ IListenRoutingUpdates, IInventoryListener, IListenDataPacket {
+ private static Logger log = LoggerFactory.getLogger(SimpleForwardingImpl.class);
private static short DEFAULT_IPSWITCH_PRIORITY = 1;
- private static String FORWARDING_RULES_CACHE_NAME = "forwarding.ipswitch.rules";
+ static final String FORWARDING_RULES_CACHE_NAME = "forwarding.ipswitch.rules";
private IfIptoHost hostTracker;
private IForwardingRulesManager frm;
private ITopologyManager topologyManager;
@@ -90,6 +101,99 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
private Map> tobePrunedPos = new HashMap>();
private IClusterContainerServices clusterContainerService = null;
private ISwitchManager switchManager;
+ private IDataPacketService dataPacketService;
+
+ /**
+ * Ip packets that are punted may not have their destination known by hostTracker at the time it
+ * is presented to SimpleForwardingImpl. Instead of dropping the packet, we will keep it around
+ * for a 'little' while, to accommodate any transients. See bug 590 for more details.
+ */
+ private class PendingPacketData {
+ private final static byte MAX_AGE = 2;
+
+ public final IPv4 pkt;
+ public final NodeConnector incomingNodeConnector;
+ private byte age;
+
+ public PendingPacketData(IPv4 pkt, NodeConnector incomingNodeConnector) {
+ this.pkt = pkt;
+ this.incomingNodeConnector = incomingNodeConnector;
+ this.age = 0;
+ }
+ boolean bumpAgeAndCheckIfTooOld() { return ++age > MAX_AGE; }
+ }
+ private static final int MAX_PENDING_PACKET_DESTINATIONS = 64;
+ private ConcurrentMap pendingPacketDestinations;
+ private Timer pendingPacketsAgerTimer;
+
+ private class PendingPacketsAgerTimerHandler extends TimerTask {
+ @Override
+ public void run() {
+ if (pendingPacketDestinations == null) {
+ return;
+ }
+ try {
+ Iterator> iterator =
+ pendingPacketDestinations.entrySet().iterator();
+ while (iterator.hasNext()) {
+ ConcurrentHashMap.Entry entry = iterator.next();
+ InetAddress dIP = entry.getKey();
+ PendingPacketData pendingPacketData = entry.getValue();
+
+ if (pendingPacketData.bumpAgeAndCheckIfTooOld()) {
+ iterator.remove(); // safe to remove while iterating...
+ log.debug("Pending packet for {} has been aged out", dIP);
+ } else {
+ /** Replace the entry for a key only if currently mapped to some value.
+ * This will protect the concurrent map against a race where this thread
+ * would be re-adding an entry that just got taken out.
+ */
+ pendingPacketDestinations.replace(dIP, pendingPacketData);
+ }
+ }
+ } catch (IllegalStateException e) {
+ log.debug("IllegalStateException Received by PendingPacketsAgerTimerHandler from: {}",
+ e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Add punted packet to pendingPackets
+ */
+ private void addToPendingPackets(InetAddress dIP, IPv4 pkt, NodeConnector incomingNodeConnector) {
+ if (pendingPacketDestinations.size() >= MAX_PENDING_PACKET_DESTINATIONS) {
+ log.info("Will not pend packet for {}: Too many destinations", dIP);
+ return;
+ }
+
+ /** TODO: The current implementation allows for up to 1 pending packet per InetAddress.
+ * This limitation is done for sake of simplicity. A potential enhancement could be to use a
+ * ConcurrentMultiMap instead of ConcurrentMap.
+ */
+ if (pendingPacketDestinations.containsKey(dIP)) {
+ log.trace("Will not pend packet for {}: Already have a packet pending", dIP);
+ return;
+ }
+
+ PendingPacketData pendingPacketData = new PendingPacketData(pkt, incomingNodeConnector);
+ pendingPacketDestinations.put(dIP, pendingPacketData);
+ log.debug("Pending packet for {}", dIP);
+ }
+
+ /**
+ * Send punted packet to given destination. This is invoked when there is a certain level of
+ * hope that the destination is known by hostTracker.
+ */
+ private void sendPendingPacket(InetAddress dIP) {
+ pendingPacketDestinations.get(dIP);
+ PendingPacketData pendingPacketData = pendingPacketDestinations.get(dIP);
+ if (pendingPacketData != null) {
+ handlePuntedIPPacket(pendingPacketData.pkt, pendingPacketData.incomingNodeConnector, false);
+ log.trace("Packet for {} is no longer pending", dIP);
+ pendingPacketDestinations.remove(dIP);
+ }
+ }
/**
* Return codes from the programming of the perHost rules in HW
@@ -97,8 +201,19 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
public enum RulesProgrammingReturnCode {
SUCCESS, FAILED_FEW_SWITCHES, FAILED_ALL_SWITCHES, FAILED_WRONG_PARAMS
}
+ public void setDataPacketService(IDataPacketService s) {
+ log.debug("Setting dataPacketService");
+ this.dataPacketService = s;
+ }
+
+ public void unsetDataPacketService(IDataPacketService s) {
+ if (this.dataPacketService == s) {
+ this.dataPacketService = null;
+ }
+ }
public void setRouting(IRouting routing) {
+ log.debug("Setting routing");
this.routing = routing;
}
@@ -108,10 +223,6 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
}
}
- public ITopologyManager getTopologyManager() {
- return topologyManager;
- }
-
public void setTopologyManager(ITopologyManager topologyManager) {
log.debug("Setting topologyManager");
this.topologyManager = topologyManager;
@@ -154,6 +265,15 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
public void startUp() {
allocateCaches();
retrieveCaches();
+ nonClusterObjectCreate();
+ }
+
+ public void nonClusterObjectCreate() {
+ pendingPacketDestinations = new ConcurrentHashMap();
+
+ /* Pending Packets Ager Timer to go off every 6 seconds to implement pending packet aging */
+ pendingPacketsAgerTimer = new Timer();
+ pendingPacketsAgerTimer.schedule(new PendingPacketsAgerTimerHandler(), 6000, 6000);
}
/**
@@ -166,10 +286,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
destroyCaches();
}
- @SuppressWarnings("deprecation")
- private void allocateCaches() {
+ private void allocateCaches() {
if (this.clusterContainerService == null) {
- log.info("un-initialized clusterContainerService, can't create cache");
+ log.trace("un-initialized clusterContainerService, can't create cache");
return;
}
@@ -183,10 +302,10 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
}
}
- @SuppressWarnings({ "unchecked", "deprecation" })
+ @SuppressWarnings({ "unchecked" })
private void retrieveCaches() {
if (this.clusterContainerService == null) {
- log.info("un-initialized clusterContainerService, can't retrieve cache");
+ log.trace("un-initialized clusterContainerService, can't retrieve cache");
return;
}
@@ -197,10 +316,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
}
}
- @SuppressWarnings("deprecation")
- private void destroyCaches() {
+ private void destroyCaches() {
if (this.clusterContainerService == null) {
- log.info("un-initialized clusterContainerService, can't destroy cache");
+ log.trace("un-initialized clusterContainerService, can't destroy cache");
return;
}
@@ -636,7 +754,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
po = e.getValue();
if (po != null) {
// Populate the Policy field now
- Status poStatus = this.frm.installFlowEntry(po);
+ Status poStatus = this.frm.modifyOrAddFlowEntry(po);
if (!poStatus.isSuccess()) {
log.error("Failed to install policy: "
+ po.getGroupName() + " ("
@@ -672,8 +790,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
*
* @return a return code that convey the programming status of the HW
*/
- private RulesProgrammingReturnCode uninstallPerHostRules(
- HostNodeConnector host) {
+ private RulesProgrammingReturnCode uninstallPerHostRules(HostNodeConnector host) {
RulesProgrammingReturnCode retCode = RulesProgrammingReturnCode.SUCCESS;
Map pos;
FlowEntry po;
@@ -770,16 +887,12 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
for (Node swId : switches) {
List pl = tobePrunedPos.get(swId);
if (pl != null) {
- log
- .debug(
- "Policies for Switch: {} in the list to be deleted: {}",
- swId, pl);
+ log.debug("Policies for Switch: {} in the list to be deleted: {}", swId, pl);
Iterator plIter = pl.iterator();
//for (Policy po: pl) {
while (plIter.hasNext()) {
FlowEntry po = plIter.next();
- log.error("Removing Policy, Switch: {} Policy: {}", swId,
- po);
+ log.error("Removing Policy, Switch: {} Policy: {}", swId, po);
this.frm.uninstallFlowEntry(po);
plIter.remove();
}
@@ -802,8 +915,7 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
log.debug("Host Facing Port in a container came up, install the rules for all hosts from this port !");
Set allHosts = this.hostTracker.getAllHosts();
for (HostNodeConnector host : allHosts) {
- if (node.equals(host.getnodeconnectorNode())
- && swPort.equals(host.getnodeConnector())) {
+ if (node.equals(host.getnodeconnectorNode())) {
/*
* This host resides behind the same switch and port for which a port up
* message is received. Ideally this should not happen, but if it does,
@@ -830,6 +942,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
Set switches = preparePerHostRules(host);
if (switches != null) {
installPerHostRules(host, switches);
+
+ // Green light for sending pending packet to this host. Safe to call if there are none.
+ sendPendingPacket(host.getNetworkAddress());
}
}
@@ -844,8 +959,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
@Override
public void notifyNode(Node node, UpdateType type,
Map propMap) {
- if (node == null)
+ if (node == null) {
return;
+ }
switch (type) {
case REMOVED:
@@ -860,8 +976,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
@Override
public void notifyNodeConnector(NodeConnector nodeConnector,
UpdateType type, Map propMap) {
- if (nodeConnector == null)
+ if (nodeConnector == null) {
return;
+ }
boolean up = false;
switch (type) {
@@ -952,6 +1069,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
*
*/
void stop() {
+ pendingPacketsAgerTimer.cancel();
+ pendingPacketDestinations.clear();
}
public void setSwitchManager(ISwitchManager switchManager) {
@@ -963,4 +1082,65 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
this.switchManager = null;
}
}
+
+ @Override
+ public PacketResult receiveDataPacket(RawPacket inPkt) {
+ if (inPkt == null) {
+ return PacketResult.IGNORED;
+ }
+ log.trace("Received a frame of size: {}", inPkt.getPacketData().length);
+ Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
+ if (formattedPak instanceof Ethernet) {
+ Object nextPak = formattedPak.getPayload();
+ if (nextPak instanceof IPv4) {
+ log.trace("Handle punted IP packet: {}", formattedPak);
+ handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector(), true);
+ }
+ }
+ return PacketResult.IGNORED;
+
+ }
+
+ private void handlePuntedIPPacket(IPv4 pkt, NodeConnector incomingNodeConnector, boolean allowAddPending) {
+ InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress());
+ if (dIP == null || hostTracker == null) {
+ log.debug("Invalid param(s) in handlePuntedIPPacket.. DestIP: {}. hostTracker: {}", dIP, hostTracker);
+ return;
+ }
+ HostNodeConnector destHost = hostTracker.hostFind(dIP);
+ /*
+ * In cases when incoming and outgoing connectors are in the same node, there is no need
+ * to verify that there is a route. Because of that, we will only need routing.getRoute()
+ * if we know that src and dst nodes are different.
+ */
+ if (destHost != null
+ && (incomingNodeConnector.getNode().equals(destHost.getnodeconnectorNode()) ||
+ routing == null ||
+ routing.getRoute(incomingNodeConnector.getNode(), destHost.getnodeconnectorNode()) != null)) {
+
+ log.trace("Host {} is at {}", dIP, destHost.getnodeConnector());
+
+ // If SimpleForwarding is aware of this host, it will try to install
+ // a path. Forward packet until it's done.
+ if (dataPacketService != null) {
+
+ /*
+ * if we know where the host is and there's a path from where this
+ * packet was punted to where the host is, then attempt best effort delivery to the host
+ */
+ NodeConnector nc = destHost.getnodeConnector();
+ log.trace("Forwarding punted IP received at {} to {}", incomingNodeConnector, nc);
+ // re-encode the Ethernet packet (the parent of the IPv4 packet)
+ RawPacket rp = this.dataPacketService.encodeDataPacket(pkt.getParent());
+ rp.setOutgoingNodeConnector(nc);
+ this.dataPacketService.transmitDataPacket(rp);
+ }
+ } else if (allowAddPending) {
+ // If we made it here, let's hang on to the punted packet, with hopes that its destination
+ // will become available soon.
+ addToPendingPackets(dIP, pkt, incomingNodeConnector);
+ } else {
+ log.warn("Dropping punted IP packet received at {} to Host {}", incomingNodeConnector, dIP);
+ }
+ }
}