BUG-542 - adding overall statictics
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperLightImpl.java
index 6a2c8080ef377194346ac0917da3e2973789260f..99c7c016cf2670d08adf96426e6a3a8bff790cad 100644 (file)
@@ -19,7 +19,9 @@ import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +39,8 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
     private ScheduledThreadPoolExecutor pool;
     private int poolSize = 10;
     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+    private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
+    private MessageSpy<DataContainer> messageSpy;
 
     private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
         @Override
@@ -69,8 +73,15 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
      * prepare queue
      */
     public void init() {
-        processQueue = new LinkedBlockingQueue<>(100);
+        processQueue = new LinkedBlockingQueue<>(1000);
         pool = new ScheduledThreadPoolExecutor(poolSize);
+
+        ticketProcessorFactory = new TicketProcessorFactory<>();
+        ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
+        ticketProcessorFactory.setTranslatorMapping(translatorMapping);
+        ticketProcessorFactory.setVersionExtractor(versionExtractor);
+        ticketProcessorFactory.setSpy(messageSpy);
+
         TicketFinisher<DataObject> finisher = new TicketFinisher<>(
                 processQueue, popListenersMapping, registeredOutTypeExtractor);
         new Thread(finisher).start();
@@ -87,18 +98,22 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
     public void push(OfHeader message, ConnectionConductor conductor) {
         push(message,conductor,QueueKeeper.QueueType.DEFAULT);
     }
-    
+
     @Override
     public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+        messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
         if(queueType == QueueKeeper.QueueType.DEFAULT) {
             TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
             ticket.setConductor(conductor);
             ticket.setMessage(message);
             LOG.debug("ticket scheduling: {}, ticket: {}",
                     message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
-            //TODO: block if queue limit reached
-            processQueue.add(ticket);
-            scheduleTicket(ticket);
+            try {
+                processQueue.put(ticket);
+                scheduleTicket(ticket);
+            } catch (InterruptedException e) {
+                LOG.warn("message enqueing interrupted", e);
+            }
         } else if (queueType == QueueKeeper.QueueType.UNORDERED){
             List<DataObject> processedMessages = translate(message,conductor);
             pop(processedMessages,conductor);
@@ -109,8 +124,8 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
      * @param ticket
      */
     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
-        pool.execute(TicketProcessorFactory.createProcessor(ticket, versionExtractor,
-                registeredSrcTypeExtractor, translatorMapping));
+        Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
+        pool.execute(ticketProcessor);
     }
 
     /**
@@ -131,7 +146,14 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
         this.popListenersMapping = popListenersMapping;
     }
-    
+
+    /**
+     * @param messageSpy the messageSpy to set
+     */
+    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+        this.messageSpy = messageSpy;
+    }
+
     private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
         List<DataObject> result = new ArrayList<>();
         Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
@@ -156,18 +178,29 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
                 }
                 List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
                 if(translatorOutput != null) {
-                    result.addAll(translator.translate(cookie, conductor.getSessionContext(), message));
+                    result.addAll(translatorOutput);
+                }
+            }
+            if (messageSpy != null) {
+                messageSpy.spyIn(message);
+                for (DataObject outMsg : result) {
+                    messageSpy.spyOut(outMsg);
                 }
             }
         } else {
             LOG.warn("No translators for this message Type: {}", messageType);
+            messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
         }
         return result;
     }
-    
-    private void pop(List<DataObject> processedMessages,ConnectionConductor conductor) {
+
+    /**
+     * @param processedMessages
+     * @param conductor
+     */
+    private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
         for (DataObject msg : processedMessages) {
-            Class<? extends Object> registeredType = 
+            Class<? extends Object> registeredType =
                     registeredOutTypeExtractor.extractRegisteredType(msg);
             Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
             if (popListeners == null) {