*/
package org.opendaylight.openflowplugin.openflow.md.queue;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageListener;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * @author mirehak
- *
+ * @param <IN>
+ * @param <OUT>
*/
-public abstract class TicketProcessorFactory {
+public class TicketProcessorFactory<IN extends DataObject, OUT extends DataObject> {
protected static final Logger LOG = LoggerFactory
.getLogger(TicketProcessorFactory.class);
+ protected VersionExtractor<IN> versionExtractor;
+ protected RegisteredTypeExtractor<IN> registeredTypeExtractor;
+ protected Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping;
+ protected MessageSpy<DataContainer> spy;
+
+ /**
+ * @param versionExtractor the versionExtractor to set
+ */
+ public void setVersionExtractor(VersionExtractor<IN> versionExtractor) {
+ this.versionExtractor = versionExtractor;
+ }
+
+ /**
+ * @param registeredTypeExtractor the registeredTypeExtractor to set
+ */
+ public void setRegisteredTypeExtractor(
+ RegisteredTypeExtractor<IN> registeredTypeExtractor) {
+ this.registeredTypeExtractor = registeredTypeExtractor;
+ }
+
+ /**
+ * @param translatorMapping the translatorMapping to set
+ */
+ public void setTranslatorMapping(
+ Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping) {
+ this.translatorMapping = translatorMapping;
+ }
+
+ /**
+ * @param spy the spy to set
+ */
+ public void setSpy(MessageSpy<DataContainer> spy) {
+ this.spy = spy;
+ }
+
+
/**
* @param ticket
- * @param listenerMapping
* @return runnable ticket processor
*/
- public static <T> Runnable createProcessor(
- final Ticket<T> ticket,
- final Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
- return new Runnable() {
+ public Runnable createProcessor(final Ticket<IN, OUT> ticket) {
+
+ Runnable ticketProcessor = new Runnable() {
@Override
public void run() {
- // TODO: delegate processing of message - notify listeners
- LOG.debug("experimenter received, type: " + ticket.getRegisteredMessageType());
-
- notifyListener();
-
- ticket.getResult().set(null);
+ LOG.debug("message received, type: {}", registeredTypeExtractor.extractRegisteredType(
+ ticket.getMessage()).getSimpleName());
+ List<OUT> translate;
+ try {
+ translate = translate();
+ ticket.getResult().set(translate);
+ // spying on result
+ if (spy != null) {
+ spy.spyIn(ticket.getMessage());
+ for (OUT outMessage : ticket.getResult().get()) {
+ spy.spyOut(outMessage);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("translation problem: {}", e.getMessage());
+ ticket.getResult().setException(e);
+ }
+ LOG.debug("message processing done (type: {}, ticket: {})",
+ registeredTypeExtractor.extractRegisteredType(ticket.getMessage()).getSimpleName(),
+ System.identityHashCode(ticket));
}
/**
- * @param listenerMapping
+ *
*/
- private void notifyListener() {
- DataObject message = ticket.getMessage();
- Class<? extends DataObject> messageType = ticket.getRegisteredMessageType();
- Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
+ private List<OUT> translate() {
+ List<OUT> result = new ArrayList<>();
+
+ IN message = ticket.getMessage();
+ Class<? extends IN> messageType = registeredTypeExtractor.extractRegisteredType(ticket.getMessage());
ConnectionConductor conductor = ticket.getConductor();
+ Collection<IMDMessageTranslator<IN, List<OUT>>> translators = null;
+ LOG.debug("translating ticket: {}, ticket: {}", messageType.getSimpleName(), System.identityHashCode(ticket));
+
+ Short version = versionExtractor.extractVersion(message);
+ if (version == null) {
+ throw new IllegalArgumentException("version is NULL");
+ }
+ TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
+ translators = translatorMapping.get(tKey);
- if (listeners != null) {
- for (IMDMessageListener listener : listeners) {
+ LOG.debug("translatorKey: {} + {}", version, messageType.getName());
+
+ if (translators != null) {
+ for (IMDMessageTranslator<IN, List<OUT>> translator : translators) {
+ SwitchConnectionDistinguisher cookie = null;
// Pass cookie only for PACKT_IN
if (messageType.equals("PacketInMessage.class")) {
- listener.receive(conductor.getAuxiliaryKey(),
- conductor.getSessionContext(), message);
- } else {
- listener.receive(null, conductor.getSessionContext(), message);
+ cookie = conductor.getAuxiliaryKey();
+ }
+ long start = System.nanoTime();
+ List<OUT> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
+ long end = System.nanoTime();
+ LOG.debug("translator: {} elapsed time {} ns",translator,end-start);
+ if(translatorOutput != null && !translatorOutput.isEmpty()) {
+ result.addAll(translatorOutput);
}
}
} else {
- LOG.warn("No listeners for this message Type {}", messageType);
+ LOG.warn("No translators for this message Type: {}", messageType);
}
+ return result;
}
};
+
+ return ticketProcessor;
}
}