*/
package org.opendaylight.openflowplugin.openflow.md.queue;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
- * @author mirehak
+ * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
+ * <br/>
+ * There is internal thread pool of limited size ({@link QueueKeeperLightImpl#setPoolSize(int)})
+ * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
+ * <br/>
+ * Workflow:
+ * <ol>
+ * <li>upon message push ticket is created and enqueued</li>
+ * <li>available threads from internal pool translate the massage wrapped in ticket</li>
+ * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
+ * (order of tickets in queue is not touched during translate)
+ * </li>
+ * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
+ * <ol>
+ * <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
+ * <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
+ * <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
+ * </ol>
+ * and this way the order of messages is preserved and also multiple threads are used by translating
+ * </li>
+ * </ol>
+ *
+ *
*/
public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
-
+
private static final Logger LOG = LoggerFactory
.getLogger(QueueKeeperLightImpl.class);
-
+
private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
private BlockingQueue<TicketResult<DataObject>> processQueue;
private ScheduledThreadPoolExecutor pool;
private int poolSize = 10;
- private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping;
-
+ private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+ private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
+ private MessageSpy<DataContainer> messageSpy;
+
private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
@Override
public Short extractVersion(OfHeader message) {
return message.getVersion();
}
};
-
- private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
+
+ private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
new RegisteredTypeExtractor<OfHeader>() {
@SuppressWarnings("unchecked")
@Override
return (Class<? extends OfHeader>) message.getImplementedInterface();
}
};
-
- private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
+
+ private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
new RegisteredTypeExtractor<DataObject>() {
@SuppressWarnings("unchecked")
@Override
return (Class<? extends DataObject>) message.getImplementedInterface();
}
};
-
+
/**
* prepare queue
*/
public void init() {
- processQueue = new LinkedBlockingQueue<>(100);
+ processQueue = new LinkedBlockingQueue<>(1000);
pool = new ScheduledThreadPoolExecutor(poolSize);
+
+ ticketProcessorFactory = new TicketProcessorFactory<>();
+ ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
+ ticketProcessorFactory.setTranslatorMapping(translatorMapping);
+ ticketProcessorFactory.setVersionExtractor(versionExtractor);
+ ticketProcessorFactory.setSpy(messageSpy);
+
TicketFinisher<DataObject> finisher = new TicketFinisher<>(
processQueue, popListenersMapping, registeredOutTypeExtractor);
new Thread(finisher).start();
}
-
+
/**
* stop processing queue
*/
@Override
public void push(OfHeader message, ConnectionConductor conductor) {
- TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
- ticket.setConductor(conductor);
- ticket.setMessage(message);
- LOG.debug("ticket scheduling: {}, ticket: {}",
- message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
- //TODO: block if queue limit reached
- processQueue.add(ticket);
- scheduleTicket(ticket);
+ push(message,conductor,QueueKeeper.QueueType.DEFAULT);
+ }
+
+ @Override
+ public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+ messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+ if(queueType == QueueKeeper.QueueType.DEFAULT) {
+ TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
+ ticket.setConductor(conductor);
+ ticket.setMessage(message);
+ LOG.debug("ticket scheduling: {}, ticket: {}",
+ message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
+ try {
+ processQueue.put(ticket);
+ scheduleTicket(ticket);
+ } catch (InterruptedException e) {
+ LOG.warn("message enqueing interrupted", e);
+ }
+ } else if (queueType == QueueKeeper.QueueType.UNORDERED){
+ List<DataObject> processedMessages = translate(message,conductor);
+ pop(processedMessages,conductor);
+ }
}
/**
* @param ticket
*/
private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
- pool.execute(TicketProcessorFactory.createProcessor(ticket, versionExtractor,
- registeredSrcTypeExtractor, translatorMapping));
+ Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
+ pool.execute(ticketProcessor);
}
/**
@Override
public void setTranslatorMapping(
- Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping) {
+ Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
this.translatorMapping = translatorMapping;
}
-
+
@Override
public void setPopListenersMapping(
Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
this.popListenersMapping = popListenersMapping;
}
+
+ /**
+ * @param messageSpy the messageSpy to set
+ */
+ public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+ this.messageSpy = messageSpy;
+ }
+
+ private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
+ List<DataObject> result = new ArrayList<>();
+ Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
+ Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
+ LOG.debug("translating message: {}", messageType.getSimpleName());
+
+ Short version = versionExtractor.extractVersion(message);
+ if (version == null) {
+ throw new IllegalArgumentException("version is NULL");
+ }
+ TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
+ translators = translatorMapping.get(tKey);
+
+ LOG.debug("translatorKey: {} + {}", version, messageType.getName());
+
+ if (translators != null) {
+ for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
+ SwitchConnectionDistinguisher cookie = null;
+ // Pass cookie only for PACKT_IN
+ if (messageType.equals("PacketInMessage.class")) {
+ cookie = conductor.getAuxiliaryKey();
+ }
+ List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
+ if(translatorOutput != null) {
+ result.addAll(translatorOutput);
+ }
+ }
+ if (messageSpy != null) {
+ messageSpy.spyIn(message);
+ for (DataObject outMsg : result) {
+ messageSpy.spyOut(outMsg);
+ }
+ }
+ } else {
+ LOG.warn("No translators for this message Type: {}", messageType);
+ messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ }
+ return result;
+ }
+
+ /**
+ * @param processedMessages
+ * @param conductor
+ */
+ private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
+ for (DataObject msg : processedMessages) {
+ Class<? extends Object> registeredType =
+ registeredOutTypeExtractor.extractRegisteredType(msg);
+ Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
+ if (popListeners == null) {
+ LOG.warn("no popListener registered for type {}"+registeredType);
+ } else {
+ for (PopListener<DataObject> consumer : popListeners) {
+ consumer.onPop(msg);
+ }
+ }
+ }
+ }
}