X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Finternal%2FDataPacketMuxDemux.java;h=932ac4e20be37f8d5ea1234265054209ceda120f;hp=cc8bcfa6b9c7c25fe828a4c7d8346dc161df4405;hb=44a86821d69cd804b6b23b437e0b27136eaac2b5;hpb=481fd027d4f425f9e25bbb8af2b9e9b8d0e3ad03 diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DataPacketMuxDemux.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DataPacketMuxDemux.java index cc8bcfa6b9..932ac4e20b 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DataPacketMuxDemux.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DataPacketMuxDemux.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -10,6 +9,7 @@ package org.opendaylight.controller.protocol_plugin.openflow.internal; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -23,42 +23,50 @@ import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExtern import org.opendaylight.controller.protocol_plugin.openflow.core.IController; import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener; import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; -import org.openflow.protocol.OFMessage; -import org.openflow.protocol.OFPacketIn; -import org.openflow.protocol.OFPacketOut; -import org.openflow.protocol.OFPort; -import org.openflow.protocol.OFType; -import org.openflow.protocol.action.OFAction; -import org.openflow.protocol.action.OFActionOutput; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import org.opendaylight.controller.sal.connection.IPluginOutConnectionService; import org.opendaylight.controller.sal.core.ConstructionException; import org.opendaylight.controller.sal.core.ContainerFlow; +import org.opendaylight.controller.sal.core.IContainerAware; import org.opendaylight.controller.sal.core.IContainerListener; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; import org.opendaylight.controller.sal.core.Property; import org.opendaylight.controller.sal.core.UpdateType; +import org.opendaylight.controller.sal.match.Match; +import org.opendaylight.controller.sal.packet.Ethernet; import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService; +import org.opendaylight.controller.sal.packet.PacketException; import org.opendaylight.controller.sal.packet.PacketResult; import org.opendaylight.controller.sal.packet.RawPacket; import org.opendaylight.controller.sal.utils.GlobalConstants; +import org.opendaylight.controller.sal.utils.HexEncode; +import org.opendaylight.controller.sal.utils.NetUtils; +import org.openflow.protocol.OFMessage; +import org.openflow.protocol.OFPacketIn; +import org.openflow.protocol.OFPacketOut; +import org.openflow.protocol.OFPort; +import org.openflow.protocol.OFType; +import org.openflow.protocol.action.OFAction; +import org.openflow.protocol.action.OFActionOutput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DataPacketMuxDemux implements IContainerListener, - IMessageListener, IDataPacketMux, IInventoryShimExternalListener { + IMessageListener, IDataPacketMux, IInventoryShimExternalListener, IContainerAware { protected static final Logger logger = LoggerFactory .getLogger(DataPacketMuxDemux.class); private IController controller = null; private ConcurrentMap swID2ISwitch = new ConcurrentHashMap(); // Gives a map between a Container and all the DataPacket listeners on SAL private ConcurrentMap pluginOutDataPacketServices = new ConcurrentHashMap(); - // Gives a map between a NodeConnector and the containers to which it belongs + // Gives a map between a NodeConnector and the containers to which it + // belongs private ConcurrentMap> nc2Container = new ConcurrentHashMap>(); // Gives a map between a Container and the FlowSpecs on it private ConcurrentMap> container2FlowSpecs = new ConcurrentHashMap>(); // Track local data packet listener private List iDataPacketListen = new CopyOnWriteArrayList(); + private IPluginOutConnectionService connectionOutService; void setIDataPacketListen(IDataPacketListen s) { if (this.iDataPacketListen != null) { @@ -127,6 +135,16 @@ public class DataPacketMuxDemux implements IContainerListener, } } + void setIPluginOutConnectionService(IPluginOutConnectionService s) { + connectionOutService = s; + } + + void unsetIPluginOutConnectionService(IPluginOutConnectionService s) { + if (connectionOutService == s) { + connectionOutService = null; + } + } + /** * Function called by the dependency manager when all the required * dependencies are satisfied @@ -137,9 +155,9 @@ public class DataPacketMuxDemux implements IContainerListener, } /** - * Function called by the dependency manager when at least one - * dependency become unsatisfied or when the component is shutting - * down because for example bundle is being stopped. + * Function called by the dependency manager when at least one dependency + * become unsatisfied or when the component is shutting down because for + * example bundle is being stopped. * */ void destroy() { @@ -155,16 +173,26 @@ public class DataPacketMuxDemux implements IContainerListener, @Override public void receive(ISwitch sw, OFMessage msg) { - if (sw == null || msg == null - || this.pluginOutDataPacketServices == null) { + if (sw == null || msg == null || this.pluginOutDataPacketServices == null) { // Something fishy, we cannot do anything - logger.debug("sw: {} and/or msg: {} and/or pluginOutDataPacketServices: {} is null!", - new Object[]{sw, msg, this.pluginOutDataPacketServices}); + logger.debug("sw: {} and/or msg: {} and/or pluginOutDataPacketServices: {} is null!", new Object[] { sw, + msg, this.pluginOutDataPacketServices }); return; } + + Long ofSwitchID = Long.valueOf(sw.getId()); + try { + Node n = new Node(Node.NodeIDType.OPENFLOW, ofSwitchID); + if (!connectionOutService.isLocal(n)) { + logger.debug("Connection service refused DataPacketMuxDemux receive {} {}", sw, msg); + return; + } + } catch (Exception e) { + return; + } + if (msg instanceof OFPacketIn) { OFPacketIn ofPacket = (OFPacketIn) msg; - Long ofSwitchID = Long.valueOf(sw.getId()); Short ofPortID = Short.valueOf(ofPacket.getInPort()); try { @@ -177,50 +205,72 @@ public class DataPacketMuxDemux implements IContainerListener, // pass the parsed packet simply because once the // packet infra is settled all the packets will passed // around as parsed and read-only - for (int i = 0; i < this.iDataPacketListen.size(); i++) { - IDataPacketListen s = this.iDataPacketListen.get(i); - if (s.receiveDataPacket(dataPacket).equals( - PacketResult.CONSUME)) { + for (IDataPacketListen s : this.iDataPacketListen) { + if (s.receiveDataPacket(dataPacket).equals(PacketResult.CONSUME)) { logger.trace("Consumed locally data packet"); return; } } - // Now dispatch the packet toward SAL at least for - // default container, we need to revisit this in a general - // slicing architecture refresh - IPluginOutDataPacketService defaultOutService = this.pluginOutDataPacketServices - .get(GlobalConstants.DEFAULT.toString()); - if (defaultOutService != null) { - defaultOutService.receiveDataPacket(dataPacket); - logger.trace("Dispatched to apps a frame of size: {} on container: {}", - ofPacket.getPacketData().length, GlobalConstants.DEFAULT.toString()); - } + boolean dispatched_to_container = false; + // Now check the mapping between nodeConnector and - // Container and later on optinally filter based on + // Container and later on optimally filter based on // flowSpec List containersRX = this.nc2Container.get(p); if (containersRX != null) { - for (int i = 0; i < containersRX.size(); i++) { - String container = containersRX.get(i); - IPluginOutDataPacketService s = this.pluginOutDataPacketServices - .get(container); - if (s != null) { - // TODO add filtering on a per-flowSpec base - s.receiveDataPacket(dataPacket); - logger.trace("Dispatched to apps a frame of size: {} on container: {}", - ofPacket.getPacketData().length, GlobalConstants.DEFAULT.toString()); - + Ethernet frame = new Ethernet(); + byte data[] = dataPacket.getPacketData(); + frame.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte); + Match packetMatch = frame.getMatch(); + for (String container : containersRX) { + boolean notify = true; + List containerFlows = this.container2FlowSpecs.get(container); + if (containerFlows != null) { + notify = false; + for (ContainerFlow cFlow : containerFlows) { + if (cFlow.allowsMatch(packetMatch)) { + notify = true; + break; + } + } + if (notify) { + IPluginOutDataPacketService s = this.pluginOutDataPacketServices.get(container); + if (s != null) { + s.receiveDataPacket(dataPacket); + if (logger.isTraceEnabled()) { + logger.trace( + "Dispatched to apps a frame of size: {}" + " on container: {}: {}", + new Object[] { ofPacket.getPacketData().length, container, + HexEncode.bytesToHexString(dataPacket.getPacketData()) }); + } + } + dispatched_to_container = true; + } + } + } + } + if (!dispatched_to_container) { + // Now dispatch the packet toward SAL for default container + IPluginOutDataPacketService defaultOutService = this.pluginOutDataPacketServices + .get(GlobalConstants.DEFAULT.toString()); + if (defaultOutService != null) { + defaultOutService.receiveDataPacket(dataPacket); + if (logger.isTraceEnabled()) { + logger.trace("Dispatched to apps a frame of size: {} on " + "container: {}: {}", + new Object[] { ofPacket.getPacketData().length, + GlobalConstants.DEFAULT.toString(), + HexEncode.bytesToHexString(dataPacket.getPacketData()) }); } } } - // This is supposed to be the catch all for all the // DataPacket hence we will assume it has been handled return; } catch (ConstructionException cex) { + } catch (PacketException e) { + logger.debug("Failed to deserialize raw packet: ", e.getMessage()); } - // If we reach this point something went wrong. return; } else { @@ -233,20 +283,26 @@ public class DataPacketMuxDemux implements IContainerListener, public void transmitDataPacket(RawPacket outPkt) { // Sanity check area if (outPkt == null) { - logger.debug("outPkt is null!"); + logger.debug("outPkt is null!"); return; } NodeConnector outPort = outPkt.getOutgoingNodeConnector(); if (outPort == null) { - logger.debug("outPort is null! outPkt: {}", outPkt); + logger.debug("outPort is null! outPkt: {}", outPkt); + return; + } + + if (!connectionOutService.isLocal(outPort.getNode())) { + logger.debug("data packets will not be sent to {} in a non-master controller", outPort.toString()); return; } + if (!outPort.getType().equals( NodeConnector.NodeConnectorIDType.OPENFLOW)) { // The output Port is not of type OpenFlow - logger.debug("outPort is not OF Type! outPort: {}", outPort); + logger.debug("outPort is not OF Type! outPort: {}", outPort); return; } @@ -257,7 +313,7 @@ public class DataPacketMuxDemux implements IContainerListener, if (sw == null) { // If we cannot get the controller descriptor we cannot even // send out the frame - logger.debug("swID: {} - sw is null!", swID); + logger.debug("swID: {} - sw is null!", swID); return; } @@ -265,45 +321,51 @@ public class DataPacketMuxDemux implements IContainerListener, // build action OFActionOutput action = new OFActionOutput().setPort(port); // build packet out - OFPacketOut po = new OFPacketOut().setBufferId( - OFPacketOut.BUFFER_ID_NONE).setInPort(OFPort.OFPP_NONE) + OFPacketOut po = new OFPacketOut() + .setBufferId(OFPacketOut.BUFFER_ID_NONE) .setActions(Collections.singletonList((OFAction) action)) .setActionsLength((short) OFActionOutput.MINIMUM_LENGTH); + if(outPkt.getIncomingNodeConnector() != null) { + po.setInPort((Short)outPkt.getIncomingNodeConnector().getID()); + } else { + po.setInPort(OFPort.OFPP_NONE); + } po.setLengthU(OFPacketOut.MINIMUM_LENGTH + po.getActionsLength() + data.length); po.setPacketData(data); - sw.asyncSend(po); + // send PACKET_OUT at high priority + sw.asyncFastSend(po); logger.trace("Transmitted a frame of size: {}", data.length); } public void addNode(Node node, Set props) { if (node == null) { - logger.debug("node is null!"); + logger.debug("node is null!"); return; - } + } long sid = (Long) node.getID(); ISwitch sw = controller.getSwitches().get(sid); if (sw == null) { - logger.debug("sid: {} - sw is null!", sid); - return; + logger.debug("sid: {} - sw is null!", sid); + return; } this.swID2ISwitch.put(sw.getId(), sw); } public void removeNode(Node node) { if (node == null) { - logger.debug("node is null!"); + logger.debug("node is null!"); return; } long sid = (Long) node.getID(); ISwitch sw = controller.getSwitches().get(sid); if (sw == null) { - logger.debug("sid: {} - sw is null!", sid); - return; + logger.debug("sid: {} - sw is null!", sid); + return; } this.swID2ISwitch.remove(sw.getId()); } @@ -328,14 +390,13 @@ public class DataPacketMuxDemux implements IContainerListener, } switch (t) { case ADDED: - if (!fSpecs.contains(previousFlow)) { - fSpecs.add(previousFlow); + if (!fSpecs.contains(currentFlow)) { + fSpecs.add(currentFlow); } + container2FlowSpecs.put(containerName, fSpecs); break; case REMOVED: - if (fSpecs.contains(previousFlow)) { - fSpecs.remove(previousFlow); - } + fSpecs.remove(currentFlow); break; case CHANGED: break; @@ -405,4 +466,29 @@ public class DataPacketMuxDemux implements IContainerListener, UpdateType type, Set props) { // do nothing } + + @Override + public void containerCreate(String containerName) { + // do nothing + } + + @Override + public void containerDestroy(String containerName) { + Set removeNodeConnectorSet = new HashSet(); + for (Map.Entry> entry : nc2Container.entrySet()) { + List ncContainers = entry.getValue(); + if (ncContainers.contains(containerName)) { + NodeConnector nodeConnector = entry.getKey(); + removeNodeConnectorSet.add(nodeConnector); + } + } + for (NodeConnector nodeConnector : removeNodeConnectorSet) { + List ncContainers = nc2Container.get(nodeConnector); + ncContainers.remove(containerName); + if (ncContainers.isEmpty()) { + nc2Container.remove(nodeConnector); + } + } + container2FlowSpecs.remove(containerName); + } }