/** * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.openflowplugin.openflow.md.queue; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase. *
* There is internal thread pool of limited size ({@link QueueKeeperLightImpl#setPoolSize(int)}) * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners) *
* Workflow: *
    *
  1. upon message push ticket is created and enqueued
  2. *
  3. available threads from internal pool translate the massage wrapped in ticket
  4. *
  5. when translation of particular message is finished, result is set in future result of wrapping ticket
    * (order of tickets in queue is not touched during translate) *
  6. *
  7. at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does: *
      *
    1. invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket
    2. *
    3. invoke blocking {@link Future#get()} on the dequeued ticket
    4. *
    5. as soon as the result of translation is available, appropriate popListener is invoked
    6. *
    * and this way the order of messages is preserved and also multiple threads are used by translating *
  8. *
* * */ public class QueueKeeperLightImpl implements QueueKeeper { private static final Logger LOG = LoggerFactory .getLogger(QueueKeeperLightImpl.class); private Map, Collection>> popListenersMapping; private BlockingQueue> processQueue; private ScheduledThreadPoolExecutor pool; private int poolSize = 10; private Map>>> translatorMapping; private TicketProcessorFactory ticketProcessorFactory; private MessageSpy messageSpy; private VersionExtractor versionExtractor = new VersionExtractor() { @Override public Short extractVersion(OfHeader message) { return message.getVersion(); } }; private RegisteredTypeExtractor registeredSrcTypeExtractor = new RegisteredTypeExtractor() { @SuppressWarnings("unchecked") @Override public Class extractRegisteredType( OfHeader message) { return (Class) message.getImplementedInterface(); } }; private RegisteredTypeExtractor registeredOutTypeExtractor = new RegisteredTypeExtractor() { @SuppressWarnings("unchecked") @Override public Class extractRegisteredType( DataObject message) { return (Class) message.getImplementedInterface(); } }; /** * prepare queue */ public void init() { processQueue = new LinkedBlockingQueue<>(1000); pool = new ScheduledThreadPoolExecutor(poolSize); ticketProcessorFactory = new TicketProcessorFactory<>(); ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor); ticketProcessorFactory.setTranslatorMapping(translatorMapping); ticketProcessorFactory.setVersionExtractor(versionExtractor); ticketProcessorFactory.setSpy(messageSpy); TicketFinisher finisher = new TicketFinisher<>( processQueue, popListenersMapping, registeredOutTypeExtractor); new Thread(finisher).start(); } /** * stop processing queue */ public void shutdown() { pool.shutdown(); } @Override public void push(OfHeader message, ConnectionConductor conductor) { push(message,conductor,QueueKeeper.QueueType.DEFAULT); } @Override public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) { messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED); if(queueType == QueueKeeper.QueueType.DEFAULT) { TicketImpl ticket = new TicketImpl<>(); ticket.setConductor(conductor); ticket.setMessage(message); LOG.debug("ticket scheduling: {}, ticket: {}", message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket)); try { processQueue.put(ticket); scheduleTicket(ticket); } catch (InterruptedException e) { LOG.warn("message enqueing interrupted", e); } } else if (queueType == QueueKeeper.QueueType.UNORDERED){ List processedMessages = translate(message,conductor); pop(processedMessages,conductor); } } /** * @param ticket */ private void scheduleTicket(Ticket ticket) { Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket); pool.execute(ticketProcessor); } /** * @param poolSize the poolSize to set */ public void setPoolSize(int poolSize) { this.poolSize = poolSize; } @Override public void setTranslatorMapping( Map>>> translatorMapping) { this.translatorMapping = translatorMapping; } @Override public void setPopListenersMapping( Map, Collection>> popListenersMapping) { this.popListenersMapping = popListenersMapping; } /** * @param messageSpy the messageSpy to set */ public void setMessageSpy(MessageSpy messageSpy) { this.messageSpy = messageSpy; } private List translate(OfHeader message, ConnectionConductor conductor) { List result = new ArrayList<>(); Class messageType = registeredSrcTypeExtractor.extractRegisteredType(message); Collection>> translators = null; LOG.debug("translating message: {}", messageType.getSimpleName()); Short version = versionExtractor.extractVersion(message); if (version == null) { throw new IllegalArgumentException("version is NULL"); } TranslatorKey tKey = new TranslatorKey(version, messageType.getName()); translators = translatorMapping.get(tKey); LOG.debug("translatorKey: {} + {}", version, messageType.getName()); if (translators != null) { for (IMDMessageTranslator> translator : translators) { SwitchConnectionDistinguisher cookie = null; // Pass cookie only for PACKT_IN if (messageType.equals("PacketInMessage.class")) { cookie = conductor.getAuxiliaryKey(); } List translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message); if(translatorOutput != null) { result.addAll(translatorOutput); } } if (messageSpy != null) { messageSpy.spyIn(message); for (DataObject outMsg : result) { messageSpy.spyOut(outMsg); } } } else { LOG.warn("No translators for this message Type: {}", messageType); messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE); } return result; } /** * @param processedMessages * @param conductor */ private void pop(List processedMessages, ConnectionConductor conductor) { for (DataObject msg : processedMessages) { Class registeredType = registeredOutTypeExtractor.extractRegisteredType(msg); Collection> popListeners = popListenersMapping.get(registeredType); if (popListeners == null) { LOG.warn("no popListener registered for type {}"+registeredType); } else { for (PopListener consumer : popListeners) { consumer.onPop(msg); } } } } }