*/
package org.opendaylight.openflowplugin.openflow.md.queue;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageListener;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author mirehak
- * @param <T> result type
- *
*/
-public class QueueKeeperLightImpl<T> implements QueueKeeper<T> {
-
- private Set<PopListener<T>> listeners;
- private BlockingQueue<Ticket<T>> processQueue;
+public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(QueueKeeperLightImpl.class);
+
+ private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
+ private BlockingQueue<TicketResult<DataObject>> processQueue;
private ScheduledThreadPoolExecutor pool;
private int poolSize = 10;
- private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
-
+ private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+ private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
+ private MessageSpy<DataContainer> messageSpy;
+
+ private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
+ @Override
+ public Short extractVersion(OfHeader message) {
+ return message.getVersion();
+ }
+ };
+
+ private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
+ new RegisteredTypeExtractor<OfHeader>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<? extends OfHeader> extractRegisteredType(
+ OfHeader message) {
+ return (Class<? extends OfHeader>) message.getImplementedInterface();
+ }
+ };
+
+ private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
+ new RegisteredTypeExtractor<DataObject>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<? extends DataObject> extractRegisteredType(
+ DataObject message) {
+ return (Class<? extends DataObject>) message.getImplementedInterface();
+ }
+ };
+
/**
* prepare queue
*/
public void init() {
- listeners = Collections.synchronizedSet(new HashSet<PopListener<T>>());
- processQueue = new LinkedBlockingQueue<>(100);
+ processQueue = new LinkedBlockingQueue<>(1000);
pool = new ScheduledThreadPoolExecutor(poolSize);
- TicketFinisher<T> finisher = new TicketFinisher<>(processQueue, listeners);
+
+ ticketProcessorFactory = new TicketProcessorFactory<>();
+ ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
+ ticketProcessorFactory.setTranslatorMapping(translatorMapping);
+ ticketProcessorFactory.setVersionExtractor(versionExtractor);
+ ticketProcessorFactory.setSpy(messageSpy);
+
+ TicketFinisher<DataObject> finisher = new TicketFinisher<>(
+ processQueue, popListenersMapping, registeredOutTypeExtractor);
new Thread(finisher).start();
}
-
+
/**
* stop processing queue
*/
}
@Override
- public void push(Class<? extends DataObject> registeredMessageType, DataObject message, ConnectionConductor conductor) {
- TicketImpl<T> ticket = new TicketImpl<>();
- ticket.setConductor(conductor);
- ticket.setMessage(message);
- ticket.setRegisteredMessageType(registeredMessageType);
- //TODO: block if queue limit reached
- processQueue.add(ticket);
- scheduleTicket(ticket);
+ public void push(OfHeader message, ConnectionConductor conductor) {
+ push(message,conductor,QueueKeeper.QueueType.DEFAULT);
+ }
+
+ @Override
+ public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+ messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+ if(queueType == QueueKeeper.QueueType.DEFAULT) {
+ TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
+ ticket.setConductor(conductor);
+ ticket.setMessage(message);
+ LOG.debug("ticket scheduling: {}, ticket: {}",
+ message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
+ try {
+ processQueue.put(ticket);
+ scheduleTicket(ticket);
+ } catch (InterruptedException e) {
+ LOG.warn("message enqueing interrupted", e);
+ }
+ } else if (queueType == QueueKeeper.QueueType.UNORDERED){
+ List<DataObject> processedMessages = translate(message,conductor);
+ pop(processedMessages,conductor);
+ }
}
/**
* @param ticket
*/
- private void scheduleTicket(Ticket<T> ticket) {
- pool.execute(TicketProcessorFactory.createProcessor(ticket, listenerMapping));
+ private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
+ Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
+ pool.execute(ticketProcessor);
+ }
+
+ /**
+ * @param poolSize the poolSize to set
+ */
+ public void setPoolSize(int poolSize) {
+ this.poolSize = poolSize;
}
@Override
- public synchronized void addPopListener(PopListener<T> listener) {
- listeners.add(listener);
+ public void setTranslatorMapping(
+ Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
+ this.translatorMapping = translatorMapping;
}
@Override
- public synchronized boolean removePopListener(PopListener<T> listener) {
- return listeners.remove(listener);
+ public void setPopListenersMapping(
+ Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
+ this.popListenersMapping = popListenersMapping;
}
/**
- * @param poolSize the poolSize to set
+ * @param messageSpy the messageSpy to set
*/
- public void setPoolSize(int poolSize) {
- this.poolSize = poolSize;
+ public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+ this.messageSpy = messageSpy;
}
- @Override
- public void setListenerMapping(
- Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
- this.listenerMapping = listenerMapping;
+ private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
+ List<DataObject> result = new ArrayList<>();
+ Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
+ Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
+ LOG.debug("translating message: {}", messageType.getSimpleName());
+
+ Short version = versionExtractor.extractVersion(message);
+ if (version == null) {
+ throw new IllegalArgumentException("version is NULL");
+ }
+ TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
+ translators = translatorMapping.get(tKey);
+
+ LOG.debug("translatorKey: {} + {}", version, messageType.getName());
+
+ if (translators != null) {
+ for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
+ SwitchConnectionDistinguisher cookie = null;
+ // Pass cookie only for PACKT_IN
+ if (messageType.equals("PacketInMessage.class")) {
+ cookie = conductor.getAuxiliaryKey();
+ }
+ List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
+ if(translatorOutput != null) {
+ result.addAll(translatorOutput);
+ }
+ }
+ if (messageSpy != null) {
+ messageSpy.spyIn(message);
+ for (DataObject outMsg : result) {
+ messageSpy.spyOut(outMsg);
+ }
+ }
+ } else {
+ LOG.warn("No translators for this message Type: {}", messageType);
+ messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ }
+ return result;
+ }
+
+ /**
+ * @param processedMessages
+ * @param conductor
+ */
+ private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
+ for (DataObject msg : processedMessages) {
+ Class<? extends Object> registeredType =
+ registeredOutTypeExtractor.extractRegisteredType(msg);
+ Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
+ if (popListeners == null) {
+ LOG.warn("no popListener registered for type {}"+registeredType);
+ } else {
+ for (PopListener<DataObject> consumer : popListeners) {
+ consumer.onPop(msg);
+ }
+ }
+ }
}
}