Merge "Bug 8873 - Bundle based reconciliation to enable bundling of messages"
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueProcessorLightImpl.java
index ad62f014b185b028cb1ad4cfbdced93c5c4998fa..60f6114b9b3f4f6d66989a24433972e10e910532 100644 (file)
@@ -8,22 +8,26 @@
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
+import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
+import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy.StatisticsGroup;
 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
-import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
-import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
@@ -32,16 +36,16 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
- * <br/>
- * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)}) 
+ * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
+ * <br>
+ * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)})
  * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
- * <br/>
+ * <br>
  * Workflow:
  * <ol>
  * <li>upon message push ticket is created and enqueued</li>
  * <li>available threads from internal pool translate the massage wrapped in ticket</li>
- * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
+ * <li>when translation of particular message is finished, result is set in future result of wrapping ticket<br>
  *     (order of tickets in queue is not touched during translate)
  * </li>
  * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
@@ -50,15 +54,15 @@ import org.slf4j.LoggerFactory;
  *      <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
  *      <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
  *    </ol>
- *    and this way the order of messages is preserved and also multiple threads are used by translating 
+ *    and this way the order of messages is preserved and also multiple threads are used by translating
  * </li>
  * </ol>
- * 
- * 
+ *
+ *
  */
 public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObject> {
 
-    protected static final Logger LOG = LoggerFactory
+    private static final Logger LOG = LoggerFactory
             .getLogger(QueueProcessorLightImpl.class);
 
     private BlockingQueue<TicketResult<DataObject>> ticketQueue;
@@ -66,7 +70,7 @@ public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObj
     private int processingPoolSize = 4;
     private ExecutorService harvesterPool;
     private ExecutorService finisherPool;
-    
+
     protected Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
@@ -82,18 +86,15 @@ public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObj
     public void init() {
         int ticketQueueCapacity = 1500;
         ticketQueue = new ArrayBlockingQueue<>(ticketQueueCapacity);
-        messageSources = new ConcurrentSkipListSet<>(
-                new Comparator<QueueKeeper<OfHeader>>() {
-                    @Override
-                    public int compare(QueueKeeper<OfHeader> o1,
-                            QueueKeeper<OfHeader> o2) {
-                        return Integer.valueOf(o1.hashCode()).compareTo(o2.hashCode());
-                    }
-                });
-        
-        processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0, 
-                TimeUnit.MILLISECONDS, 
-                new ArrayBlockingQueue<Runnable>(ticketQueueCapacity), 
+        /*
+         * TODO FIXME - DOES THIS REALLY NEED TO BE CONCURRENT?  Can we figure out
+         * a better lifecycle?  Why does this have to be a Set?
+         */
+        messageSources = new CopyOnWriteArraySet<>();
+
+        processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0,
+                TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(ticketQueueCapacity),
                 "OFmsgProcessor");
         // force blocking when pool queue is full
         processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@@ -107,15 +108,15 @@ public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObj
                 }
             }
         });
-        
-        harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0, 
+
+        harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0,
                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgHarvester");
-        finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0, 
+        finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0,
                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgFinisher");
         finisher = new TicketFinisherImpl(
                 ticketQueue, popListenersMapping);
         finisherPool.execute(finisher);
-        
+
         harvester = new QueueKeeperHarvester<OfHeader>(this, messageSources);
         harvesterPool.execute(harvester);
 
@@ -134,32 +135,32 @@ public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObj
 
     @Override
     public void enqueueQueueItem(QueueItem<OfHeader> queueItem) {
-        messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+        messageSpy.spyMessage(queueItem.getMessage(), StatisticsGroup.FROM_SWITCH_ENQUEUED);
         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
         ticket.setConductor(queueItem.getConnectionConductor());
         ticket.setMessage(queueItem.getMessage());
         ticket.setQueueType(queueItem.getQueueType());
-        
+
         LOG.trace("ticket scheduling: {}, ticket: {}",
-                queueItem.getMessage().getImplementedInterface().getSimpleName(), 
+                queueItem.getMessage().getImplementedInterface().getSimpleName(),
                 System.identityHashCode(queueItem));
         scheduleTicket(ticket);
     }
-    
-    
+
+
     @Override
     public void directProcessQueueItem(QueueItem<OfHeader> queueItem) {
-        messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+        messageSpy.spyMessage(queueItem.getMessage(), StatisticsGroup.FROM_SWITCH_ENQUEUED);
         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
         ticket.setConductor(queueItem.getConnectionConductor());
         ticket.setMessage(queueItem.getMessage());
-        
+
         LOG.debug("ticket scheduling: {}, ticket: {}",
-                queueItem.getMessage().getImplementedInterface().getSimpleName(), 
+                queueItem.getMessage().getImplementedInterface().getSimpleName(),
                 System.identityHashCode(queueItem));
-        
+
         ticketProcessorFactory.createProcessor(ticket).run();
-        
+
         // publish notification
         finisher.firePopNotification(ticket.getDirectResult());
     }
@@ -219,21 +220,21 @@ public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObj
         if (! added) {
             LOG.debug("registration of message source queue failed - already registered");
         }
-        MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration = 
+        MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration =
                 new MessageSourcePollRegistration<>(this, queue);
         return queuePollRegistration;
     }
-    
+
     @Override
     public boolean unregisterMessageSource(QueueKeeper<OfHeader> queue) {
         return messageSources.remove(queue);
     }
-    
+
     @Override
     public Collection<QueueKeeper<OfHeader>> getMessageSources() {
         return messageSources;
     }
-    
+
     @Override
     public HarvesterHandle getHarvesterHandle() {
         return harvester;