BUG-542 - adding overall statictics
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperLightImpl.java
index 0b0dbf891f449c1a16027a433607deaec1529288..99c7c016cf2670d08adf96426e6a3a8bff790cad 100644 (file)
@@ -7,43 +7,86 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageListener;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author mirehak
- * @param <T> result type
- *
  */
-public class QueueKeeperLightImpl<T> implements QueueKeeper<T> {
-    
-    private Set<PopListener<T>> listeners;
-    private BlockingQueue<Ticket<T>> processQueue;
+public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(QueueKeeperLightImpl.class);
+
+    private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
+    private BlockingQueue<TicketResult<DataObject>> processQueue;
     private ScheduledThreadPoolExecutor pool;
     private int poolSize = 10;
-    private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping; 
-    
+    private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+    private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
+    private MessageSpy<DataContainer> messageSpy;
+
+    private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
+        @Override
+        public Short extractVersion(OfHeader message) {
+            return message.getVersion();
+        }
+    };
+
+    private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
+            new RegisteredTypeExtractor<OfHeader>() {
+        @SuppressWarnings("unchecked")
+        @Override
+        public Class<? extends OfHeader> extractRegisteredType(
+                OfHeader message) {
+            return (Class<? extends OfHeader>) message.getImplementedInterface();
+        }
+    };
+
+    private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
+            new RegisteredTypeExtractor<DataObject>() {
+        @SuppressWarnings("unchecked")
+        @Override
+        public Class<? extends DataObject> extractRegisteredType(
+                DataObject message) {
+            return (Class<? extends DataObject>) message.getImplementedInterface();
+        }
+    };
+
     /**
      * prepare queue
      */
     public void init() {
-        listeners = Collections.synchronizedSet(new HashSet<PopListener<T>>());
-        processQueue = new LinkedBlockingQueue<>(100);
+        processQueue = new LinkedBlockingQueue<>(1000);
         pool = new ScheduledThreadPoolExecutor(poolSize);
-        TicketFinisher<T> finisher = new TicketFinisher<>(processQueue, listeners);
+
+        ticketProcessorFactory = new TicketProcessorFactory<>();
+        ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
+        ticketProcessorFactory.setTranslatorMapping(translatorMapping);
+        ticketProcessorFactory.setVersionExtractor(versionExtractor);
+        ticketProcessorFactory.setSpy(messageSpy);
+
+        TicketFinisher<DataObject> finisher = new TicketFinisher<>(
+                processQueue, popListenersMapping, registeredOutTypeExtractor);
         new Thread(finisher).start();
     }
-    
+
     /**
      * stop processing queue
      */
@@ -52,43 +95,121 @@ public class QueueKeeperLightImpl<T> implements QueueKeeper<T> {
     }
 
     @Override
-    public void push(Class<? extends DataObject> registeredMessageType, DataObject message, ConnectionConductor conductor) {
-        TicketImpl<T> ticket = new TicketImpl<>();
-        ticket.setConductor(conductor);
-        ticket.setMessage(message);
-        ticket.setRegisteredMessageType(registeredMessageType);
-        //TODO: block if queue limit reached 
-        processQueue.add(ticket);
-        scheduleTicket(ticket);
+    public void push(OfHeader message, ConnectionConductor conductor) {
+        push(message,conductor,QueueKeeper.QueueType.DEFAULT);
+    }
+
+    @Override
+    public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+        messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+        if(queueType == QueueKeeper.QueueType.DEFAULT) {
+            TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
+            ticket.setConductor(conductor);
+            ticket.setMessage(message);
+            LOG.debug("ticket scheduling: {}, ticket: {}",
+                    message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
+            try {
+                processQueue.put(ticket);
+                scheduleTicket(ticket);
+            } catch (InterruptedException e) {
+                LOG.warn("message enqueing interrupted", e);
+            }
+        } else if (queueType == QueueKeeper.QueueType.UNORDERED){
+            List<DataObject> processedMessages = translate(message,conductor);
+            pop(processedMessages,conductor);
+        }
     }
 
     /**
      * @param ticket
      */
-    private void scheduleTicket(Ticket<T> ticket) {
-        pool.execute(TicketProcessorFactory.createProcessor(ticket, listenerMapping));
+    private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
+        Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
+        pool.execute(ticketProcessor);
+    }
+
+    /**
+     * @param poolSize the poolSize to set
+     */
+    public void setPoolSize(int poolSize) {
+        this.poolSize = poolSize;
     }
 
     @Override
-    public synchronized void addPopListener(PopListener<T> listener) {
-        listeners.add(listener);
+    public void setTranslatorMapping(
+            Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
+        this.translatorMapping = translatorMapping;
     }
 
     @Override
-    public synchronized boolean removePopListener(PopListener<T> listener) {
-        return listeners.remove(listener);
+    public void setPopListenersMapping(
+            Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
+        this.popListenersMapping = popListenersMapping;
     }
 
     /**
-     * @param poolSize the poolSize to set
+     * @param messageSpy the messageSpy to set
      */
-    public void setPoolSize(int poolSize) {
-        this.poolSize = poolSize;
+    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+        this.messageSpy = messageSpy;
     }
 
-    @Override
-    public void setListenerMapping(
-            Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
-        this.listenerMapping = listenerMapping;
+    private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
+        List<DataObject> result = new ArrayList<>();
+        Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
+        Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
+        LOG.debug("translating message: {}", messageType.getSimpleName());
+
+        Short version = versionExtractor.extractVersion(message);
+        if (version == null) {
+           throw new IllegalArgumentException("version is NULL");
+        }
+        TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
+        translators = translatorMapping.get(tKey);
+
+        LOG.debug("translatorKey: {} + {}", version, messageType.getName());
+
+        if (translators != null) {
+            for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
+                SwitchConnectionDistinguisher cookie = null;
+                // Pass cookie only for PACKT_IN
+                if (messageType.equals("PacketInMessage.class")) {
+                    cookie = conductor.getAuxiliaryKey();
+                }
+                List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
+                if(translatorOutput != null) {
+                    result.addAll(translatorOutput);
+                }
+            }
+            if (messageSpy != null) {
+                messageSpy.spyIn(message);
+                for (DataObject outMsg : result) {
+                    messageSpy.spyOut(outMsg);
+                }
+            }
+        } else {
+            LOG.warn("No translators for this message Type: {}", messageType);
+            messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+        }
+        return result;
+    }
+
+    /**
+     * @param processedMessages
+     * @param conductor
+     */
+    private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
+        for (DataObject msg : processedMessages) {
+            Class<? extends Object> registeredType =
+                    registeredOutTypeExtractor.extractRegisteredType(msg);
+            Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
+            if (popListeners == null) {
+                LOG.warn("no popListener registered for type {}"+registeredType);
+            } else {
+                for (PopListener<DataObject> consumer : popListeners) {
+                    consumer.onPop(msg);
+                }
+            }
+        }
     }
 }