Bug 1764 - moved Session related interfaces to openflowplugin-api
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueProcessorLightImpl.java
index ad62f014b185b028cb1ad4cfbdced93c5c4998fa..005237b027eb180e3f9b172dcface73432c6cc80 100644 (file)
@@ -8,22 +8,27 @@
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
+import org.opendaylight.openflowplugin.api.statistics.MessageSpy;
+import org.opendaylight.openflowplugin.api.statistics.MessageSpy.STATISTIC_GROUP;
+import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
-import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
-import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
@@ -32,9 +37,9 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
+ * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
  * <br/>
- * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)}) 
+ * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)})
  * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
  * <br/>
  * Workflow:
@@ -50,11 +55,11 @@ import org.slf4j.LoggerFactory;
  *      <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
  *      <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
  *    </ol>
- *    and this way the order of messages is preserved and also multiple threads are used by translating 
+ *    and this way the order of messages is preserved and also multiple threads are used by translating
  * </li>
  * </ol>
- * 
- * 
+ *
+ *
  */
 public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObject> {
 
@@ -66,7 +71,7 @@ public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObj
     private int processingPoolSize = 4;
     private ExecutorService harvesterPool;
     private ExecutorService finisherPool;
-    
+
     protected Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
@@ -82,18 +87,15 @@ public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObj
     public void init() {
         int ticketQueueCapacity = 1500;
         ticketQueue = new ArrayBlockingQueue<>(ticketQueueCapacity);
-        messageSources = new ConcurrentSkipListSet<>(
-                new Comparator<QueueKeeper<OfHeader>>() {
-                    @Override
-                    public int compare(QueueKeeper<OfHeader> o1,
-                            QueueKeeper<OfHeader> o2) {
-                        return Integer.valueOf(o1.hashCode()).compareTo(o2.hashCode());
-                    }
-                });
-        
-        processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0, 
-                TimeUnit.MILLISECONDS, 
-                new ArrayBlockingQueue<Runnable>(ticketQueueCapacity), 
+        /*
+         * TODO FIXME - DOES THIS REALLY NEED TO BE CONCURRENT?  Can we figure out
+         * a better lifecycle?  Why does this have to be a Set?
+         */
+        messageSources = new CopyOnWriteArraySet<>();
+
+        processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0,
+                TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(ticketQueueCapacity),
                 "OFmsgProcessor");
         // force blocking when pool queue is full
         processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@@ -107,15 +109,15 @@ public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObj
                 }
             }
         });
-        
-        harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0, 
+
+        harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0,
                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgHarvester");
-        finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0, 
+        finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0,
                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgFinisher");
         finisher = new TicketFinisherImpl(
                 ticketQueue, popListenersMapping);
         finisherPool.execute(finisher);
-        
+
         harvester = new QueueKeeperHarvester<OfHeader>(this, messageSources);
         harvesterPool.execute(harvester);
 
@@ -139,27 +141,27 @@ public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObj
         ticket.setConductor(queueItem.getConnectionConductor());
         ticket.setMessage(queueItem.getMessage());
         ticket.setQueueType(queueItem.getQueueType());
-        
+
         LOG.trace("ticket scheduling: {}, ticket: {}",
-                queueItem.getMessage().getImplementedInterface().getSimpleName(), 
+                queueItem.getMessage().getImplementedInterface().getSimpleName(),
                 System.identityHashCode(queueItem));
         scheduleTicket(ticket);
     }
-    
-    
+
+
     @Override
     public void directProcessQueueItem(QueueItem<OfHeader> queueItem) {
         messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
         ticket.setConductor(queueItem.getConnectionConductor());
         ticket.setMessage(queueItem.getMessage());
-        
+
         LOG.debug("ticket scheduling: {}, ticket: {}",
-                queueItem.getMessage().getImplementedInterface().getSimpleName(), 
+                queueItem.getMessage().getImplementedInterface().getSimpleName(),
                 System.identityHashCode(queueItem));
-        
+
         ticketProcessorFactory.createProcessor(ticket).run();
-        
+
         // publish notification
         finisher.firePopNotification(ticket.getDirectResult());
     }
@@ -219,21 +221,21 @@ public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObj
         if (! added) {
             LOG.debug("registration of message source queue failed - already registered");
         }
-        MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration = 
+        MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration =
                 new MessageSourcePollRegistration<>(this, queue);
         return queuePollRegistration;
     }
-    
+
     @Override
     public boolean unregisterMessageSource(QueueKeeper<OfHeader> queue) {
         return messageSources.remove(queue);
     }
-    
+
     @Override
     public Collection<QueueKeeper<OfHeader>> getMessageSources() {
         return messageSources;
     }
-    
+
     @Override
     public HarvesterHandle getHarvesterHandle() {
         return harvester;