X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fqueue%2FQueueProcessorLightImpl.java;h=60f6114b9b3f4f6d66989a24433972e10e910532;hb=436d3b53fa2293b6d11348732db9bcc371e80642;hp=ad62f014b185b028cb1ad4cfbdced93c5c4998fa;hpb=b0e8d777920a430776efe07c625637fe75204505;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java index ad62f014b1..60f6114b9b 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java @@ -8,22 +8,26 @@ package org.opendaylight.openflowplugin.openflow.md.queue; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - -import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; +import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator; +import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; +import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle; +import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener; +import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem; +import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper; +import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor; +import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy; +import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy.StatisticsGroup; import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor; -import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; -import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; @@ -32,16 +36,16 @@ import org.slf4j.LoggerFactory; /** - * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase. - *
- * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)}) + * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase. + *
+ * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)}) * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners) - *
+ *
* Workflow: *
    *
  1. upon message push ticket is created and enqueued
  2. *
  3. available threads from internal pool translate the massage wrapped in ticket
  4. - *
  5. when translation of particular message is finished, result is set in future result of wrapping ticket
    + *
  6. when translation of particular message is finished, result is set in future result of wrapping ticket
    * (order of tickets in queue is not touched during translate) *
  7. *
  8. at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does: @@ -50,15 +54,15 @@ import org.slf4j.LoggerFactory; *
  9. invoke blocking {@link Future#get()} on the dequeued ticket
  10. *
  11. as soon as the result of translation is available, appropriate popListener is invoked
  12. *
- * and this way the order of messages is preserved and also multiple threads are used by translating + * and this way the order of messages is preserved and also multiple threads are used by translating * * - * - * + * + * */ public class QueueProcessorLightImpl implements QueueProcessor { - protected static final Logger LOG = LoggerFactory + private static final Logger LOG = LoggerFactory .getLogger(QueueProcessorLightImpl.class); private BlockingQueue> ticketQueue; @@ -66,7 +70,7 @@ public class QueueProcessorLightImpl implements QueueProcessor, Collection>> popListenersMapping; private Map>>> translatorMapping; private TicketProcessorFactory ticketProcessorFactory; @@ -82,18 +86,15 @@ public class QueueProcessorLightImpl implements QueueProcessor(ticketQueueCapacity); - messageSources = new ConcurrentSkipListSet<>( - new Comparator>() { - @Override - public int compare(QueueKeeper o1, - QueueKeeper o2) { - return Integer.valueOf(o1.hashCode()).compareTo(o2.hashCode()); - } - }); - - processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue(ticketQueueCapacity), + /* + * TODO FIXME - DOES THIS REALLY NEED TO BE CONCURRENT? Can we figure out + * a better lifecycle? Why does this have to be a Set? + */ + messageSources = new CopyOnWriteArraySet<>(); + + processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(ticketQueueCapacity), "OFmsgProcessor"); // force blocking when pool queue is full processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @@ -107,15 +108,15 @@ public class QueueProcessorLightImpl implements QueueProcessor(1), "OFmsgHarvester"); - finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0, + finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1), "OFmsgFinisher"); finisher = new TicketFinisherImpl( ticketQueue, popListenersMapping); finisherPool.execute(finisher); - + harvester = new QueueKeeperHarvester(this, messageSources); harvesterPool.execute(harvester); @@ -134,32 +135,32 @@ public class QueueProcessorLightImpl implements QueueProcessor queueItem) { - messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED); + messageSpy.spyMessage(queueItem.getMessage(), StatisticsGroup.FROM_SWITCH_ENQUEUED); TicketImpl ticket = new TicketImpl<>(); ticket.setConductor(queueItem.getConnectionConductor()); ticket.setMessage(queueItem.getMessage()); ticket.setQueueType(queueItem.getQueueType()); - + LOG.trace("ticket scheduling: {}, ticket: {}", - queueItem.getMessage().getImplementedInterface().getSimpleName(), + queueItem.getMessage().getImplementedInterface().getSimpleName(), System.identityHashCode(queueItem)); scheduleTicket(ticket); } - - + + @Override public void directProcessQueueItem(QueueItem queueItem) { - messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED); + messageSpy.spyMessage(queueItem.getMessage(), StatisticsGroup.FROM_SWITCH_ENQUEUED); TicketImpl ticket = new TicketImpl<>(); ticket.setConductor(queueItem.getConnectionConductor()); ticket.setMessage(queueItem.getMessage()); - + LOG.debug("ticket scheduling: {}, ticket: {}", - queueItem.getMessage().getImplementedInterface().getSimpleName(), + queueItem.getMessage().getImplementedInterface().getSimpleName(), System.identityHashCode(queueItem)); - + ticketProcessorFactory.createProcessor(ticket).run(); - + // publish notification finisher.firePopNotification(ticket.getDirectResult()); } @@ -219,21 +220,21 @@ public class QueueProcessorLightImpl implements QueueProcessor> queuePollRegistration = + MessageSourcePollRegistration> queuePollRegistration = new MessageSourcePollRegistration<>(this, queue); return queuePollRegistration; } - + @Override public boolean unregisterMessageSource(QueueKeeper queue) { return messageSources.remove(queue); } - + @Override public Collection> getMessageSources() { return messageSources; } - + @Override public HarvesterHandle getHarvesterHandle() { return harvester;