BUG-542 - adding overall statictics
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperLightImpl.java
index 42c1cb9e59c197ab328367a0af69e2c63c314e2a..99c7c016cf2670d08adf96426e6a3a8bff790cad 100644 (file)
@@ -19,7 +19,9 @@ import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +40,7 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
     private int poolSize = 10;
     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
-    private MessageSpy<OfHeader, DataObject> messageSpy;
+    private MessageSpy<DataContainer> messageSpy;
 
     private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
         @Override
@@ -71,7 +73,7 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
      * prepare queue
      */
     public void init() {
-        processQueue = new LinkedBlockingQueue<>();
+        processQueue = new LinkedBlockingQueue<>(1000);
         pool = new ScheduledThreadPoolExecutor(poolSize);
 
         ticketProcessorFactory = new TicketProcessorFactory<>();
@@ -99,15 +101,19 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
 
     @Override
     public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+        messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
         if(queueType == QueueKeeper.QueueType.DEFAULT) {
             TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
             ticket.setConductor(conductor);
             ticket.setMessage(message);
             LOG.debug("ticket scheduling: {}, ticket: {}",
                     message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
-            //TODO: block if queue limit reached
-            processQueue.add(ticket);
-            scheduleTicket(ticket);
+            try {
+                processQueue.put(ticket);
+                scheduleTicket(ticket);
+            } catch (InterruptedException e) {
+                LOG.warn("message enqueing interrupted", e);
+            }
         } else if (queueType == QueueKeeper.QueueType.UNORDERED){
             List<DataObject> processedMessages = translate(message,conductor);
             pop(processedMessages,conductor);
@@ -144,7 +150,7 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
     /**
      * @param messageSpy the messageSpy to set
      */
-    public void setMessageSpy(MessageSpy<OfHeader, DataObject> messageSpy) {
+    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
         this.messageSpy = messageSpy;
     }
 
@@ -172,20 +178,27 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
                 }
                 List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
                 if(translatorOutput != null) {
-                    result.addAll(translator.translate(cookie, conductor.getSessionContext(), message));
+                    result.addAll(translatorOutput);
                 }
             }
             if (messageSpy != null) {
                 messageSpy.spyIn(message);
-                messageSpy.spyOut(result);
+                for (DataObject outMsg : result) {
+                    messageSpy.spyOut(outMsg);
+                }
             }
         } else {
             LOG.warn("No translators for this message Type: {}", messageType);
+            messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
         }
         return result;
     }
 
-    private void pop(List<DataObject> processedMessages,ConnectionConductor conductor) {
+    /**
+     * @param processedMessages
+     * @param conductor
+     */
+    private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
         for (DataObject msg : processedMessages) {
             Class<? extends Object> registeredType =
                     registeredOutTypeExtractor.extractRegisteredType(msg);