import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private ScheduledThreadPoolExecutor pool;
private int poolSize = 10;
private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+ private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
+ private MessageSpy<DataContainer> messageSpy;
private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
@Override
* prepare queue
*/
public void init() {
- processQueue = new LinkedBlockingQueue<>(100);
+ processQueue = new LinkedBlockingQueue<>(1000);
pool = new ScheduledThreadPoolExecutor(poolSize);
+
+ ticketProcessorFactory = new TicketProcessorFactory<>();
+ ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
+ ticketProcessorFactory.setTranslatorMapping(translatorMapping);
+ ticketProcessorFactory.setVersionExtractor(versionExtractor);
+ ticketProcessorFactory.setSpy(messageSpy);
+
TicketFinisher<DataObject> finisher = new TicketFinisher<>(
processQueue, popListenersMapping, registeredOutTypeExtractor);
new Thread(finisher).start();
public void push(OfHeader message, ConnectionConductor conductor) {
push(message,conductor,QueueKeeper.QueueType.DEFAULT);
}
-
+
@Override
public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+ messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
if(queueType == QueueKeeper.QueueType.DEFAULT) {
TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
ticket.setConductor(conductor);
ticket.setMessage(message);
LOG.debug("ticket scheduling: {}, ticket: {}",
message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
- //TODO: block if queue limit reached
- processQueue.add(ticket);
- scheduleTicket(ticket);
+ try {
+ processQueue.put(ticket);
+ scheduleTicket(ticket);
+ } catch (InterruptedException e) {
+ LOG.warn("message enqueing interrupted", e);
+ }
} else if (queueType == QueueKeeper.QueueType.UNORDERED){
List<DataObject> processedMessages = translate(message,conductor);
pop(processedMessages,conductor);
* @param ticket
*/
private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
- pool.execute(TicketProcessorFactory.createProcessor(ticket, versionExtractor,
- registeredSrcTypeExtractor, translatorMapping));
+ Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
+ pool.execute(ticketProcessor);
}
/**
Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
this.popListenersMapping = popListenersMapping;
}
-
+
+ /**
+ * @param messageSpy the messageSpy to set
+ */
+ public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+ this.messageSpy = messageSpy;
+ }
+
private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
List<DataObject> result = new ArrayList<>();
Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
}
List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
if(translatorOutput != null) {
- result.addAll(translator.translate(cookie, conductor.getSessionContext(), message));
+ result.addAll(translatorOutput);
+ }
+ }
+ if (messageSpy != null) {
+ messageSpy.spyIn(message);
+ for (DataObject outMsg : result) {
+ messageSpy.spyOut(outMsg);
}
}
} else {
LOG.warn("No translators for this message Type: {}", messageType);
+ messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
}
return result;
}
-
- private void pop(List<DataObject> processedMessages,ConnectionConductor conductor) {
+
+ /**
+ * @param processedMessages
+ * @param conductor
+ */
+ private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
for (DataObject msg : processedMessages) {
- Class<? extends Object> registeredType =
+ Class<? extends Object> registeredType =
registeredOutTypeExtractor.extractRegisteredType(msg);
Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
if (popListeners == null) {