X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fqueue%2FQueueProcessorLightImpl.java;h=60f6114b9b3f4f6d66989a24433972e10e910532;hb=436d3b53fa2293b6d11348732db9bcc371e80642;hp=ad62f014b185b028cb1ad4cfbdced93c5c4998fa;hpb=b0e8d777920a430776efe07c625637fe75204505;p=openflowplugin.git
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java
index ad62f014b1..60f6114b9b 100644
--- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java
+++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java
@@ -8,22 +8,26 @@
package org.opendaylight.openflowplugin.openflow.md.queue;
import java.util.Collection;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
+import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
+import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy.StatisticsGroup;
import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
-import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
-import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
@@ -32,16 +36,16 @@ import org.slf4j.LoggerFactory;
/**
- * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
- *
- * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)})
+ * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
+ *
+ * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)})
* dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
- *
+ *
* Workflow:
*
* - upon message push ticket is created and enqueued
* - available threads from internal pool translate the massage wrapped in ticket
- * - when translation of particular message is finished, result is set in future result of wrapping ticket
+ *
- when translation of particular message is finished, result is set in future result of wrapping ticket
* (order of tickets in queue is not touched during translate)
*
* - at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
@@ -50,15 +54,15 @@ import org.slf4j.LoggerFactory;
*
- invoke blocking {@link Future#get()} on the dequeued ticket
* - as soon as the result of translation is available, appropriate popListener is invoked
*
- * and this way the order of messages is preserved and also multiple threads are used by translating
+ * and this way the order of messages is preserved and also multiple threads are used by translating
*
*
- *
- *
+ *
+ *
*/
public class QueueProcessorLightImpl implements QueueProcessor {
- protected static final Logger LOG = LoggerFactory
+ private static final Logger LOG = LoggerFactory
.getLogger(QueueProcessorLightImpl.class);
private BlockingQueue> ticketQueue;
@@ -66,7 +70,7 @@ public class QueueProcessorLightImpl implements QueueProcessor, Collection>> popListenersMapping;
private Map>>> translatorMapping;
private TicketProcessorFactory ticketProcessorFactory;
@@ -82,18 +86,15 @@ public class QueueProcessorLightImpl implements QueueProcessor(ticketQueueCapacity);
- messageSources = new ConcurrentSkipListSet<>(
- new Comparator>() {
- @Override
- public int compare(QueueKeeper o1,
- QueueKeeper o2) {
- return Integer.valueOf(o1.hashCode()).compareTo(o2.hashCode());
- }
- });
-
- processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0,
- TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue(ticketQueueCapacity),
+ /*
+ * TODO FIXME - DOES THIS REALLY NEED TO BE CONCURRENT? Can we figure out
+ * a better lifecycle? Why does this have to be a Set?
+ */
+ messageSources = new CopyOnWriteArraySet<>();
+
+ processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue(ticketQueueCapacity),
"OFmsgProcessor");
// force blocking when pool queue is full
processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@@ -107,15 +108,15 @@ public class QueueProcessorLightImpl implements QueueProcessor(1), "OFmsgHarvester");
- finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0,
+ finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1), "OFmsgFinisher");
finisher = new TicketFinisherImpl(
ticketQueue, popListenersMapping);
finisherPool.execute(finisher);
-
+
harvester = new QueueKeeperHarvester(this, messageSources);
harvesterPool.execute(harvester);
@@ -134,32 +135,32 @@ public class QueueProcessorLightImpl implements QueueProcessor queueItem) {
- messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+ messageSpy.spyMessage(queueItem.getMessage(), StatisticsGroup.FROM_SWITCH_ENQUEUED);
TicketImpl ticket = new TicketImpl<>();
ticket.setConductor(queueItem.getConnectionConductor());
ticket.setMessage(queueItem.getMessage());
ticket.setQueueType(queueItem.getQueueType());
-
+
LOG.trace("ticket scheduling: {}, ticket: {}",
- queueItem.getMessage().getImplementedInterface().getSimpleName(),
+ queueItem.getMessage().getImplementedInterface().getSimpleName(),
System.identityHashCode(queueItem));
scheduleTicket(ticket);
}
-
-
+
+
@Override
public void directProcessQueueItem(QueueItem queueItem) {
- messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+ messageSpy.spyMessage(queueItem.getMessage(), StatisticsGroup.FROM_SWITCH_ENQUEUED);
TicketImpl ticket = new TicketImpl<>();
ticket.setConductor(queueItem.getConnectionConductor());
ticket.setMessage(queueItem.getMessage());
-
+
LOG.debug("ticket scheduling: {}, ticket: {}",
- queueItem.getMessage().getImplementedInterface().getSimpleName(),
+ queueItem.getMessage().getImplementedInterface().getSimpleName(),
System.identityHashCode(queueItem));
-
+
ticketProcessorFactory.createProcessor(ticket).run();
-
+
// publish notification
finisher.firePopNotification(ticket.getDirectResult());
}
@@ -219,21 +220,21 @@ public class QueueProcessorLightImpl implements QueueProcessor> queuePollRegistration =
+ MessageSourcePollRegistration> queuePollRegistration =
new MessageSourcePollRegistration<>(this, queue);
return queuePollRegistration;
}
-
+
@Override
public boolean unregisterMessageSource(QueueKeeper queue) {
return messageSources.remove(queue);
}
-
+
@Override
public Collection> getMessageSources() {
return messageSources;
}
-
+
@Override
public HarvesterHandle getHarvesterHandle() {
return harvester;