Bug 1185 :of-flow: push-mpls-action different in config and operational data store
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperLightImpl.java
index bd32de1de82a56793575671677cff6a9215f2f0f..04a7ad78e9a458c0504558738acdfde8dffaee6c 100644 (file)
@@ -7,42 +7,73 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
- * @author mirehak
+ * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
+ * <br/>
+ * There is internal thread pool of limited size ({@link QueueKeeperLightImpl#setPoolSize(int)}) 
+ * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
+ * <br/>
+ * Workflow:
+ * <ol>
+ * <li>upon message push ticket is created and enqueued</li>
+ * <li>available threads from internal pool translate the massage wrapped in ticket</li>
+ * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
+ *     (order of tickets in queue is not touched during translate)
+ * </li>
+ * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
+ *    <ol>
+ *      <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
+ *      <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
+ *      <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
+ *    </ol>
+ *    and this way the order of messages is preserved and also multiple threads are used by translating 
+ * </li>
+ * </ol>
+ * 
+ * 
  */
 public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
-    
+
     private static final Logger LOG = LoggerFactory
             .getLogger(QueueKeeperLightImpl.class);
-    
+
     private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
     private BlockingQueue<TicketResult<DataObject>> processQueue;
     private ScheduledThreadPoolExecutor pool;
     private int poolSize = 10;
-    private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping;
-    
+    private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+    private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
+    private MessageSpy<DataContainer> messageSpy;
+
     private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
         @Override
         public Short extractVersion(OfHeader message) {
             return message.getVersion();
         }
     };
-    
-    private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor = 
+
+    private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
             new RegisteredTypeExtractor<OfHeader>() {
         @SuppressWarnings("unchecked")
         @Override
@@ -51,8 +82,8 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
             return (Class<? extends OfHeader>) message.getImplementedInterface();
         }
     };
-    
-    private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor = 
+
+    private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
             new RegisteredTypeExtractor<DataObject>() {
         @SuppressWarnings("unchecked")
         @Override
@@ -61,18 +92,25 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
             return (Class<? extends DataObject>) message.getImplementedInterface();
         }
     };
-    
+
     /**
      * prepare queue
      */
     public void init() {
-        processQueue = new LinkedBlockingQueue<>(100);
+        processQueue = new LinkedBlockingQueue<>(1000);
         pool = new ScheduledThreadPoolExecutor(poolSize);
+
+        ticketProcessorFactory = new TicketProcessorFactory<>();
+        ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
+        ticketProcessorFactory.setTranslatorMapping(translatorMapping);
+        ticketProcessorFactory.setVersionExtractor(versionExtractor);
+        ticketProcessorFactory.setSpy(messageSpy);
+
         TicketFinisher<DataObject> finisher = new TicketFinisher<>(
                 processQueue, popListenersMapping, registeredOutTypeExtractor);
         new Thread(finisher).start();
     }
-    
+
     /**
      * stop processing queue
      */
@@ -82,22 +120,36 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
 
     @Override
     public void push(OfHeader message, ConnectionConductor conductor) {
-        TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
-        ticket.setConductor(conductor);
-        ticket.setMessage(message);
-        LOG.debug("ticket scheduling: {}, ticket: {}", 
-                message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
-        //TODO: block if queue limit reached 
-        processQueue.add(ticket);
-        scheduleTicket(ticket);
+        push(message,conductor,QueueKeeper.QueueType.DEFAULT);
+    }
+
+    @Override
+    public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+        messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+        if(queueType == QueueKeeper.QueueType.DEFAULT) {
+            TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
+            ticket.setConductor(conductor);
+            ticket.setMessage(message);
+            LOG.debug("ticket scheduling: {}, ticket: {}",
+                    message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
+            try {
+                processQueue.put(ticket);
+                scheduleTicket(ticket);
+            } catch (InterruptedException e) {
+                LOG.warn("message enqueing interrupted", e);
+            }
+        } else if (queueType == QueueKeeper.QueueType.UNORDERED){
+            List<DataObject> processedMessages = translate(message,conductor);
+            pop(processedMessages,conductor);
+        }
     }
 
     /**
      * @param ticket
      */
     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
-        pool.execute(TicketProcessorFactory.createProcessor(ticket, versionExtractor, 
-                registeredSrcTypeExtractor, translatorMapping));
+        Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
+        pool.execute(ticketProcessor);
     }
 
     /**
@@ -109,13 +161,79 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
 
     @Override
     public void setTranslatorMapping(
-            Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping) {
+            Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
         this.translatorMapping = translatorMapping;
     }
-    
+
     @Override
     public void setPopListenersMapping(
             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
         this.popListenersMapping = popListenersMapping;
     }
+
+    /**
+     * @param messageSpy the messageSpy to set
+     */
+    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+        this.messageSpy = messageSpy;
+    }
+
+    private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
+        List<DataObject> result = new ArrayList<>();
+        Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
+        Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
+        LOG.debug("translating message: {}", messageType.getSimpleName());
+
+        Short version = versionExtractor.extractVersion(message);
+        if (version == null) {
+           throw new IllegalArgumentException("version is NULL");
+        }
+        TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
+        translators = translatorMapping.get(tKey);
+
+        LOG.debug("translatorKey: {} + {}", version, messageType.getName());
+
+        if (translators != null) {
+            for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
+                SwitchConnectionDistinguisher cookie = null;
+                // Pass cookie only for PACKT_IN
+                if (messageType.equals("PacketInMessage.class")) {
+                    cookie = conductor.getAuxiliaryKey();
+                }
+                List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
+                if(translatorOutput != null) {
+                    result.addAll(translatorOutput);
+                }
+            }
+            if (messageSpy != null) {
+                messageSpy.spyIn(message);
+                for (DataObject outMsg : result) {
+                    messageSpy.spyOut(outMsg);
+                }
+            }
+        } else {
+            LOG.warn("No translators for this message Type: {}", messageType);
+            messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+        }
+        return result;
+    }
+
+    /**
+     * @param processedMessages
+     * @param conductor
+     */
+    private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
+        for (DataObject msg : processedMessages) {
+            Class<? extends Object> registeredType =
+                    registeredOutTypeExtractor.extractRegisteredType(msg);
+            Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
+            if (popListeners == null) {
+                LOG.warn("no popListener registered for type {}"+registeredType);
+            } else {
+                for (PopListener<DataObject> consumer : popListeners) {
+                    consumer.onPop(msg);
+                }
+            }
+        }
+    }
 }