@Override
public List<DataObject> translate(SwitchConnectionDistinguisher cookie,
SessionContext sc, OfHeader msg) {
- if(msg instanceof PacketInMessage) {
+ if(sc !=null && msg instanceof PacketInMessage) {
PacketInMessage message = (PacketInMessage)msg;
List<DataObject> list = new CopyOnWriteArrayList<DataObject>();
LOG.info("PacketIn: InPort: {} Cookie: {} Match.type: {}",
// get the DPID
GetFeaturesOutput features = sc.getFeatures();
- BigInteger dpid = features.getDatapathId();
-
- // get the Cookie if it exists
- if(message.getCookie() != null) {
- pktInBuilder.setCookie(new Cookie(message.getCookie().longValue()));
- }
-
- // extract the port number
- Long port = null;
-
- if (message.getInPort() != null) {
- // this doesn't work--at least for OF1.3
- port = message.getInPort().longValue();
- }
-
- // this should work for OF1.3
- if (message.getMatch() != null && message.getMatch().getMatchEntries() != null) {
- List<MatchEntries> entries = message.getMatch().getMatchEntries();
- for (MatchEntries entry : entries) {
- PortNumberMatchEntry tmp = entry.getAugmentation(PortNumberMatchEntry.class);
- if (tmp != null) {
- if (port == null) {
- port = tmp.getPortNumber().getValue();
- } else {
- LOG.warn("Multiple input ports (at least {} and {})",
- port, tmp.getPortNumber().getValue());
+ // Make sure we actually have features, some naughty switches start sending packetIn before they send us the FeatureReply
+ if ( features != null) {
+ BigInteger dpid = features.getDatapathId();
+
+ // get the Cookie if it exists
+ if(message.getCookie() != null) {
+ pktInBuilder.setCookie(new Cookie(message.getCookie().longValue()));
+ }
+
+ // extract the port number
+ Long port = null;
+
+ if (message.getInPort() != null) {
+ // this doesn't work--at least for OF1.3
+ port = message.getInPort().longValue();
+ }
+
+ // this should work for OF1.3
+ if (message.getMatch() != null && message.getMatch().getMatchEntries() != null) {
+ List<MatchEntries> entries = message.getMatch().getMatchEntries();
+ for (MatchEntries entry : entries) {
+ PortNumberMatchEntry tmp = entry.getAugmentation(PortNumberMatchEntry.class);
+ if (tmp != null) {
+ if (port == null) {
+ port = tmp.getPortNumber().getValue();
+ } else {
+ LOG.warn("Multiple input ports (at least {} and {})",
+ port, tmp.getPortNumber().getValue());
+ }
}
}
}
- }
-
- if (port == null) {
- // no incoming port, so drop the event
- LOG.warn("Received packet_in, but couldn't find an input port");
- return null;
- }else{
- LOG.info("Receive packet_in from {} on port {}", dpid, port);
- }
- pktInBuilder.setIngress(InventoryDataServiceUtil.nodeConnectorRefFromDatapathIdPortno(dpid,port));
- PacketReceived pktInEvent = pktInBuilder.build();
- list.add(pktInEvent);
- return list;
- } else {
- return null;
- }
+
+ if (port == null) {
+ // no incoming port, so drop the event
+ LOG.warn("Received packet_in, but couldn't find an input port");
+ return null;
+ }else{
+ LOG.info("Receive packet_in from {} on port {}", dpid, port);
+ }
+ pktInBuilder.setIngress(InventoryDataServiceUtil.nodeConnectorRefFromDatapathIdPortno(dpid,port));
+ PacketReceived pktInEvent = pktInBuilder.build();
+ list.add(pktInEvent);
+ return list;
+ }
+ }
+ return null;
}
}
*/
package org.opendaylight.openflowplugin.openflow.md.queue;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yangtools.yang.binding.DataObject;
@Override
public void push(OfHeader message, ConnectionConductor conductor) {
- TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
- ticket.setConductor(conductor);
- ticket.setMessage(message);
- LOG.debug("ticket scheduling: {}, ticket: {}",
- message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
- //TODO: block if queue limit reached
- processQueue.add(ticket);
- scheduleTicket(ticket);
+ push(message,conductor,QueueKeeper.QueueType.DEFAULT);
+ }
+
+ @Override
+ public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+ if(queueType == QueueKeeper.QueueType.DEFAULT) {
+ TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
+ ticket.setConductor(conductor);
+ ticket.setMessage(message);
+ LOG.debug("ticket scheduling: {}, ticket: {}",
+ message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
+ //TODO: block if queue limit reached
+ processQueue.add(ticket);
+ scheduleTicket(ticket);
+ } else if (queueType == QueueKeeper.QueueType.UNORDERED){
+ List<DataObject> processedMessages = translate(message,conductor);
+ pop(processedMessages,conductor);
+ }
}
/**
Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
this.popListenersMapping = popListenersMapping;
}
+
+ private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
+ List<DataObject> result = new ArrayList<>();
+ Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
+ Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
+ LOG.debug("translating message: {}", messageType.getSimpleName());
+
+ Short version = versionExtractor.extractVersion(message);
+ if (version == null) {
+ throw new IllegalArgumentException("version is NULL");
+ }
+ TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
+ translators = translatorMapping.get(tKey);
+
+ LOG.debug("translatorKey: {} + {}", version, messageType.getName());
+
+ if (translators != null) {
+ for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
+ SwitchConnectionDistinguisher cookie = null;
+ // Pass cookie only for PACKT_IN
+ if (messageType.equals("PacketInMessage.class")) {
+ cookie = conductor.getAuxiliaryKey();
+ }
+ List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
+ if(translatorOutput != null) {
+ result.addAll(translator.translate(cookie, conductor.getSessionContext(), message));
+ }
+ }
+ } else {
+ LOG.warn("No translators for this message Type: {}", messageType);
+ }
+ return result;
+ }
+
+ private void pop(List<DataObject> processedMessages,ConnectionConductor conductor) {
+ for (DataObject msg : processedMessages) {
+ Class<? extends Object> registeredType =
+ registeredOutTypeExtractor.extractRegisteredType(msg);
+ Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
+ if (popListeners == null) {
+ LOG.warn("no popListener registered for type {}"+registeredType);
+ } else {
+ for (PopListener<DataObject> consumer : popListeners) {
+ consumer.onPop(msg);
+ }
+ }
+ }
+ }
}