Merge "Introducing the Modification classses"
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / DataPacketMuxDemux.java
index a8ebcb068b9399f0fee0c6b8dd6370e3b810b4d2..932ac4e20be37f8d5ea1234265054209ceda120f 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.protocol_plugin.openflow.internal;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -22,31 +23,36 @@ import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExtern
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPacketIn;
-import org.openflow.protocol.OFPacketOut;
-import org.openflow.protocol.OFPort;
-import org.openflow.protocol.OFType;
-import org.openflow.protocol.action.OFAction;
-import org.openflow.protocol.action.OFActionOutput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.ConstructionException;
 import org.opendaylight.controller.sal.core.ContainerFlow;
+import org.opendaylight.controller.sal.core.IContainerAware;
 import org.opendaylight.controller.sal.core.IContainerListener;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.UpdateType;
+import org.opendaylight.controller.sal.match.Match;
+import org.opendaylight.controller.sal.packet.Ethernet;
 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
+import org.opendaylight.controller.sal.packet.PacketException;
 import org.opendaylight.controller.sal.packet.PacketResult;
 import org.opendaylight.controller.sal.packet.RawPacket;
 import org.opendaylight.controller.sal.utils.GlobalConstants;
 import org.opendaylight.controller.sal.utils.HexEncode;
+import org.opendaylight.controller.sal.utils.NetUtils;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFPacketIn;
+import org.openflow.protocol.OFPacketOut;
+import org.openflow.protocol.OFPort;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
+import org.openflow.protocol.action.OFActionOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DataPacketMuxDemux implements IContainerListener,
-        IMessageListener, IDataPacketMux, IInventoryShimExternalListener {
+        IMessageListener, IDataPacketMux, IInventoryShimExternalListener, IContainerAware {
     protected static final Logger logger = LoggerFactory
             .getLogger(DataPacketMuxDemux.class);
     private IController controller = null;
@@ -60,6 +66,7 @@ public class DataPacketMuxDemux implements IContainerListener,
     private ConcurrentMap<String, List<ContainerFlow>> container2FlowSpecs = new ConcurrentHashMap<String, List<ContainerFlow>>();
     // Track local data packet listener
     private List<IDataPacketListen> iDataPacketListen = new CopyOnWriteArrayList<IDataPacketListen>();
+    private IPluginOutConnectionService connectionOutService;
 
     void setIDataPacketListen(IDataPacketListen s) {
         if (this.iDataPacketListen != null) {
@@ -128,10 +135,20 @@ public class DataPacketMuxDemux implements IContainerListener,
         }
     }
 
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionOutService == s) {
+            connectionOutService = null;
+        }
+    }
+
     /**
      * Function called by the dependency manager when all the required
      * dependencies are satisfied
-     * 
+     *
      */
     void init() {
         this.controller.addMessageListener(OFType.PACKET_IN, this);
@@ -141,7 +158,7 @@ public class DataPacketMuxDemux implements IContainerListener,
      * Function called by the dependency manager when at least one dependency
      * become unsatisfied or when the component is shutting down because for
      * example bundle is being stopped.
-     * 
+     *
      */
     void destroy() {
         this.controller.removeMessageListener(OFType.PACKET_IN, this);
@@ -156,17 +173,26 @@ public class DataPacketMuxDemux implements IContainerListener,
 
     @Override
     public void receive(ISwitch sw, OFMessage msg) {
-        if (sw == null || msg == null
-                || this.pluginOutDataPacketServices == null) {
+        if (sw == null || msg == null || this.pluginOutDataPacketServices == null) {
             // Something fishy, we cannot do anything
-            logger.debug(
-                    "sw: {} and/or msg: {} and/or pluginOutDataPacketServices: {} is null!",
-                    new Object[] { sw, msg, this.pluginOutDataPacketServices });
+            logger.debug("sw: {} and/or msg: {} and/or pluginOutDataPacketServices: {} is null!", new Object[] { sw,
+                    msg, this.pluginOutDataPacketServices });
+            return;
+        }
+
+        Long ofSwitchID = Long.valueOf(sw.getId());
+        try {
+            Node n = new Node(Node.NodeIDType.OPENFLOW, ofSwitchID);
+            if (!connectionOutService.isLocal(n)) {
+                logger.debug("Connection service refused DataPacketMuxDemux receive {} {}", sw, msg);
+                return;
+            }
+        } catch (Exception e) {
             return;
         }
+
         if (msg instanceof OFPacketIn) {
             OFPacketIn ofPacket = (OFPacketIn) msg;
-            Long ofSwitchID = Long.valueOf(sw.getId());
             Short ofPortID = Short.valueOf(ofPacket.getInPort());
 
             try {
@@ -179,61 +205,72 @@ public class DataPacketMuxDemux implements IContainerListener,
                 // pass the parsed packet simply because once the
                 // packet infra is settled all the packets will passed
                 // around as parsed and read-only
-                for (int i = 0; i < this.iDataPacketListen.size(); i++) {
-                    IDataPacketListen s = this.iDataPacketListen.get(i);
-                    if (s.receiveDataPacket(dataPacket).equals(
-                            PacketResult.CONSUME)) {
+                for (IDataPacketListen s : this.iDataPacketListen) {
+                      if (s.receiveDataPacket(dataPacket).equals(PacketResult.CONSUME)) {
                         logger.trace("Consumed locally data packet");
                         return;
                     }
                 }
 
-                // Now dispatch the packet toward SAL at least for
-                // default container, we need to revisit this in a general
-                // slicing architecture refresh
-                IPluginOutDataPacketService defaultOutService = this.pluginOutDataPacketServices
-                        .get(GlobalConstants.DEFAULT.toString());
-                if (defaultOutService != null) {
-                    defaultOutService.receiveDataPacket(dataPacket);
-                    logger.trace(
-                            "Dispatched to apps a frame of size: {} on container: {}: {}",
-                            new Object[] {
-                                    ofPacket.getPacketData().length,
-                                    GlobalConstants.DEFAULT.toString(),
-                                    HexEncode.bytesToHexString(dataPacket
-                                            .getPacketData()) });
-                }
+                boolean dispatched_to_container = false;
+
                 // Now check the mapping between nodeConnector and
                 // Container and later on optimally filter based on
                 // flowSpec
                 List<String> containersRX = this.nc2Container.get(p);
                 if (containersRX != null) {
-                    for (int i = 0; i < containersRX.size(); i++) {
-                        String container = containersRX.get(i);
-                        IPluginOutDataPacketService s = this.pluginOutDataPacketServices
-                                .get(container);
-                        if (s != null) {
-                            // TODO add filtering on a per-flowSpec base
-                            s.receiveDataPacket(dataPacket);
-                            logger.trace(
-                                    "Dispatched to apps a frame of size: {} on container: {}: {}",
-                                    new Object[] {
-                                            ofPacket.getPacketData().length,
-                                            GlobalConstants.DEFAULT.toString(),
-                                            HexEncode
-                                                    .bytesToHexString(dataPacket
-                                                            .getPacketData()) });
-
+                    Ethernet frame = new Ethernet();
+                    byte data[] = dataPacket.getPacketData();
+                    frame.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
+                    Match packetMatch = frame.getMatch();
+                    for (String container : containersRX) {
+                        boolean notify = true;
+                        List<ContainerFlow> containerFlows = this.container2FlowSpecs.get(container);
+                        if (containerFlows != null) {
+                            notify = false;
+                            for (ContainerFlow cFlow : containerFlows) {
+                                if (cFlow.allowsMatch(packetMatch)) {
+                                    notify = true;
+                                    break;
+                                }
+                            }
+                            if (notify) {
+                                IPluginOutDataPacketService s = this.pluginOutDataPacketServices.get(container);
+                                if (s != null) {
+                                    s.receiveDataPacket(dataPacket);
+                                    if (logger.isTraceEnabled()) {
+                                        logger.trace(
+                                                "Dispatched to apps a frame of size: {}" + " on container: {}: {}",
+                                                new Object[] { ofPacket.getPacketData().length, container,
+                                                        HexEncode.bytesToHexString(dataPacket.getPacketData()) });
+                                    }
+                                }
+                                dispatched_to_container = true;
+                            }
+                        }
+                    }
+                }
+                if (!dispatched_to_container) {
+                    // Now dispatch the packet toward SAL for default container
+                    IPluginOutDataPacketService defaultOutService = this.pluginOutDataPacketServices
+                        .get(GlobalConstants.DEFAULT.toString());
+                    if (defaultOutService != null) {
+                        defaultOutService.receiveDataPacket(dataPacket);
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Dispatched to apps a frame of size: {} on " + "container: {}: {}",
+                                         new Object[] { ofPacket.getPacketData().length,
+                                                        GlobalConstants.DEFAULT.toString(),
+                                                        HexEncode.bytesToHexString(dataPacket.getPacketData()) });
                         }
                     }
                 }
-
                 // This is supposed to be the catch all for all the
                 // DataPacket hence we will assume it has been handled
                 return;
             } catch (ConstructionException cex) {
+            } catch (PacketException e) {
+                logger.debug("Failed to deserialize raw packet: ", e.getMessage());
             }
-
             // If we reach this point something went wrong.
             return;
         } else {
@@ -256,6 +293,12 @@ public class DataPacketMuxDemux implements IContainerListener,
             return;
         }
 
+        if (!connectionOutService.isLocal(outPort.getNode())) {
+            logger.debug("data packets will not be sent to {} in a non-master controller", outPort.toString());
+            return;
+        }
+
+
         if (!outPort.getType().equals(
                 NodeConnector.NodeConnectorIDType.OPENFLOW)) {
             // The output Port is not of type OpenFlow
@@ -280,15 +323,20 @@ public class DataPacketMuxDemux implements IContainerListener,
         // build packet out
         OFPacketOut po = new OFPacketOut()
                 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
-                .setInPort(OFPort.OFPP_NONE)
                 .setActions(Collections.singletonList((OFAction) action))
                 .setActionsLength((short) OFActionOutput.MINIMUM_LENGTH);
+        if(outPkt.getIncomingNodeConnector() != null) {
+            po.setInPort((Short)outPkt.getIncomingNodeConnector().getID());
+        } else {
+            po.setInPort(OFPort.OFPP_NONE);
+        }
 
         po.setLengthU(OFPacketOut.MINIMUM_LENGTH + po.getActionsLength()
                 + data.length);
         po.setPacketData(data);
 
-        sw.asyncSend(po);
+        // send PACKET_OUT at high priority
+        sw.asyncFastSend(po);
         logger.trace("Transmitted a frame of size: {}", data.length);
     }
 
@@ -342,14 +390,13 @@ public class DataPacketMuxDemux implements IContainerListener,
         }
         switch (t) {
         case ADDED:
-            if (!fSpecs.contains(previousFlow)) {
-                fSpecs.add(previousFlow);
+            if (!fSpecs.contains(currentFlow)) {
+                fSpecs.add(currentFlow);
             }
+            container2FlowSpecs.put(containerName, fSpecs);
             break;
         case REMOVED:
-            if (fSpecs.contains(previousFlow)) {
-                fSpecs.remove(previousFlow);
-            }
+            fSpecs.remove(currentFlow);
             break;
         case CHANGED:
             break;
@@ -419,4 +466,29 @@ public class DataPacketMuxDemux implements IContainerListener,
             UpdateType type, Set<Property> props) {
         // do nothing
     }
+
+    @Override
+    public void containerCreate(String containerName) {
+        // do nothing
+    }
+
+    @Override
+    public void containerDestroy(String containerName) {
+        Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
+        for (Map.Entry<NodeConnector, List<String>> entry : nc2Container.entrySet()) {
+            List<String> ncContainers = entry.getValue();
+            if (ncContainers.contains(containerName)) {
+                NodeConnector nodeConnector = entry.getKey();
+                removeNodeConnectorSet.add(nodeConnector);
+            }
+        }
+        for (NodeConnector nodeConnector : removeNodeConnectorSet) {
+            List<String> ncContainers = nc2Container.get(nodeConnector);
+            ncContainers.remove(containerName);
+            if (ncContainers.isEmpty()) {
+                nc2Container.remove(nodeConnector);
+            }
+        }
+        container2FlowSpecs.remove(containerName);
+    }
 }