Added an UNORDERED option to QueueKeeper 36/3736/3
authorEd Warnicke <eaw@cisco.com>
Sun, 15 Dec 2013 17:29:21 +0000 (09:29 -0800)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 16 Dec 2013 07:57:39 +0000 (07:57 +0000)
Not all packets require order be preserved, in particular
its actively harmful as a way to deal with PacketIn.

So... I added a QueueType enum {DEFAULT, UNORDERED} (so
we could in principle decide we need different queues in the future)

Kept the existing queueKeeper beahvior for existing invocations
Changed over PacketIn to using UNORDERED

Made PacketInTranslator a bit more robust.

Change-Id: Id835e9840ed26f9ed10e9090ce49ffaa429a002c
Signed-off-by: Ed Warnicke <eaw@cisco.com>
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/translator/PacketInTranslator.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeper.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java

index 551188333598386fbc13a9f98670ad4f08061a96..5c5271f56959e5c25dd25408f23083e000ffbea3 100644 (file)
@@ -198,7 +198,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     @Override
     public void onPacketInMessage(PacketInMessage message) {
-        queueKeeper.push(message, this);
+        queueKeeper.push(message, this, QueueKeeper.QueueType.UNORDERED);
     }
 
     @Override
index 0c8b56994a0797d78f7fefbae06aed20bcfd302d..a93567d70e94a3879dd7e595048c5700188e6de1 100644 (file)
@@ -27,7 +27,7 @@ public class PacketInTranslator implements IMDMessageTranslator<OfHeader, List<D
     @Override
     public List<DataObject> translate(SwitchConnectionDistinguisher cookie,
             SessionContext sc, OfHeader msg) {
-        if(msg instanceof PacketInMessage) {
+        if(sc !=null && msg instanceof PacketInMessage) {
             PacketInMessage message = (PacketInMessage)msg;
             List<DataObject> list = new CopyOnWriteArrayList<DataObject>();
             LOG.info("PacketIn: InPort: {} Cookie: {} Match.type: {}",
@@ -41,50 +41,52 @@ public class PacketInTranslator implements IMDMessageTranslator<OfHeader, List<D
 
            // get the DPID
            GetFeaturesOutput features = sc.getFeatures();
-           BigInteger dpid = features.getDatapathId();
-
-           // get the Cookie if it exists
-           if(message.getCookie() != null) {
-               pktInBuilder.setCookie(new Cookie(message.getCookie().longValue()));
-           }
-
-           // extract the port number
-           Long port = null;
-
-           if (message.getInPort() != null) {
-               // this doesn't work--at least for OF1.3
-               port = message.getInPort().longValue();
-           }
-
-           // this should work for OF1.3
-           if (message.getMatch() != null && message.getMatch().getMatchEntries() != null) {
-               List<MatchEntries> entries = message.getMatch().getMatchEntries();
-               for (MatchEntries entry : entries) {
-                   PortNumberMatchEntry tmp = entry.getAugmentation(PortNumberMatchEntry.class);
-                   if (tmp != null) {
-                       if (port == null) {
-                           port = tmp.getPortNumber().getValue();
-                       } else {
-                           LOG.warn("Multiple input ports (at least {} and {})",
-                                    port, tmp.getPortNumber().getValue());
+           // Make sure we actually have features, some naughty switches start sending packetIn before they send us the FeatureReply
+           if ( features != null) {
+               BigInteger dpid = features.getDatapathId();
+    
+               // get the Cookie if it exists
+               if(message.getCookie() != null) {
+                   pktInBuilder.setCookie(new Cookie(message.getCookie().longValue()));
+               }
+    
+               // extract the port number
+               Long port = null;
+    
+               if (message.getInPort() != null) {
+                   // this doesn't work--at least for OF1.3
+                   port = message.getInPort().longValue();
+               }
+    
+               // this should work for OF1.3
+               if (message.getMatch() != null && message.getMatch().getMatchEntries() != null) {
+                   List<MatchEntries> entries = message.getMatch().getMatchEntries();
+                   for (MatchEntries entry : entries) {
+                       PortNumberMatchEntry tmp = entry.getAugmentation(PortNumberMatchEntry.class);
+                       if (tmp != null) {
+                           if (port == null) {
+                               port = tmp.getPortNumber().getValue();
+                           } else {
+                               LOG.warn("Multiple input ports (at least {} and {})",
+                                        port, tmp.getPortNumber().getValue());
+                           }
                        }
                    }
                }
-           }
-
-           if (port == null) {
-               // no incoming port, so drop the event
-               LOG.warn("Received packet_in, but couldn't find an input port");
-               return null;
-           }else{
-               LOG.info("Receive packet_in from {} on port {}", dpid, port);
-           }
-           pktInBuilder.setIngress(InventoryDataServiceUtil.nodeConnectorRefFromDatapathIdPortno(dpid,port));
-           PacketReceived pktInEvent = pktInBuilder.build();
-           list.add(pktInEvent);
-            return list;
-        } else {
-            return null;
-        }
+    
+               if (port == null) {
+                   // no incoming port, so drop the event
+                   LOG.warn("Received packet_in, but couldn't find an input port");
+                   return null;
+               }else{
+                   LOG.info("Receive packet_in from {} on port {}", dpid, port);
+               }
+               pktInBuilder.setIngress(InventoryDataServiceUtil.nodeConnectorRefFromDatapathIdPortno(dpid,port));
+               PacketReceived pktInEvent = pktInBuilder.build();
+               list.add(pktInEvent);
+                return list;
+           } 
+        } 
+        return null;
     }
 }
index d12f261adc4c251292ae8e01c2f23ecb6a6f7605..94f8bb5c860d4d4e2e2ef7b8d1fa6c8e421c5a6d 100644 (file)
@@ -21,6 +21,8 @@ import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
  * @param <OUT> result type
  */
 public interface QueueKeeper<IN, OUT> {
+    
+    public enum QueueType {DEFAULT, UNORDERED}
 
     /**
      * @param translatorMapping
@@ -32,6 +34,13 @@ public interface QueueKeeper<IN, OUT> {
      * @param conductor
      */
     void push(IN message, ConnectionConductor conductor);
+    
+    /**
+     * @param message
+     * @param conductor
+     * @param ordered - true if message order matters, false otherwise
+     */
+    void push(IN message, ConnectionConductor conductor, QueueType queueType);
 
     /**
      * @param popListenersMapping the popListenersMapping to set
index e038775ebf11637f16d226a937c173e13ea6fe33..6a2c8080ef377194346ac0917da3e2973789260f 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -16,6 +17,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.binding.DataObject;
@@ -83,14 +85,24 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
 
     @Override
     public void push(OfHeader message, ConnectionConductor conductor) {
-        TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
-        ticket.setConductor(conductor);
-        ticket.setMessage(message);
-        LOG.debug("ticket scheduling: {}, ticket: {}",
-                message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
-        //TODO: block if queue limit reached
-        processQueue.add(ticket);
-        scheduleTicket(ticket);
+        push(message,conductor,QueueKeeper.QueueType.DEFAULT);
+    }
+    
+    @Override
+    public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+        if(queueType == QueueKeeper.QueueType.DEFAULT) {
+            TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
+            ticket.setConductor(conductor);
+            ticket.setMessage(message);
+            LOG.debug("ticket scheduling: {}, ticket: {}",
+                    message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
+            //TODO: block if queue limit reached
+            processQueue.add(ticket);
+            scheduleTicket(ticket);
+        } else if (queueType == QueueKeeper.QueueType.UNORDERED){
+            List<DataObject> processedMessages = translate(message,conductor);
+            pop(processedMessages,conductor);
+        }
     }
 
     /**
@@ -119,4 +131,52 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
         this.popListenersMapping = popListenersMapping;
     }
+    
+    private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
+        List<DataObject> result = new ArrayList<>();
+        Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
+        Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
+        LOG.debug("translating message: {}", messageType.getSimpleName());
+
+        Short version = versionExtractor.extractVersion(message);
+        if (version == null) {
+           throw new IllegalArgumentException("version is NULL");
+        }
+        TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
+        translators = translatorMapping.get(tKey);
+
+        LOG.debug("translatorKey: {} + {}", version, messageType.getName());
+
+        if (translators != null) {
+            for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
+                SwitchConnectionDistinguisher cookie = null;
+                // Pass cookie only for PACKT_IN
+                if (messageType.equals("PacketInMessage.class")) {
+                    cookie = conductor.getAuxiliaryKey();
+                }
+                List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
+                if(translatorOutput != null) {
+                    result.addAll(translator.translate(cookie, conductor.getSessionContext(), message));
+                }
+            }
+        } else {
+            LOG.warn("No translators for this message Type: {}", messageType);
+        }
+        return result;
+    }
+    
+    private void pop(List<DataObject> processedMessages,ConnectionConductor conductor) {
+        for (DataObject msg : processedMessages) {
+            Class<? extends Object> registeredType = 
+                    registeredOutTypeExtractor.extractRegisteredType(msg);
+            Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
+            if (popListeners == null) {
+                LOG.warn("no popListener registered for type {}"+registeredType);
+            } else {
+                for (PopListener<DataObject> consumer : popListeners) {
+                    consumer.onPop(msg);
+                }
+            }
+        }
+    }
 }