preparing QueueKeeper and message translation
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperLightImpl.java
index 0b0dbf891f449c1a16027a433607deaec1529288..84ec1beba7c4f5c78c2c96e8032f749305a96bbf 100644 (file)
@@ -17,30 +17,42 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageListener;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author mirehak
- * @param <T> result type
- *
  */
-public class QueueKeeperLightImpl<T> implements QueueKeeper<T> {
+public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
+    
+    private static final Logger LOG = LoggerFactory
+            .getLogger(QueueKeeperLightImpl.class);
     
-    private Set<PopListener<T>> listeners;
-    private BlockingQueue<Ticket<T>> processQueue;
+    private Set<PopListener<DataObject>> listeners;
+    private BlockingQueue<TicketResult<DataObject>> processQueue;
     private ScheduledThreadPoolExecutor pool;
     private int poolSize = 10;
-    private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping; 
+    private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping;
+    
+    private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
+        @Override
+        public Short extractVersion(OfHeader message) {
+            return message.getVersion();
+        }
+    }; 
     
     /**
      * prepare queue
      */
     public void init() {
-        listeners = Collections.synchronizedSet(new HashSet<PopListener<T>>());
+        listeners = Collections.synchronizedSet(new HashSet<PopListener<DataObject>>());
         processQueue = new LinkedBlockingQueue<>(100);
         pool = new ScheduledThreadPoolExecutor(poolSize);
-        TicketFinisher<T> finisher = new TicketFinisher<>(processQueue, listeners);
+        TicketFinisher<DataObject> finisher = new TicketFinisher<>(processQueue, listeners);
         new Thread(finisher).start();
     }
     
@@ -52,11 +64,12 @@ public class QueueKeeperLightImpl<T> implements QueueKeeper<T> {
     }
 
     @Override
-    public void push(Class<? extends DataObject> registeredMessageType, DataObject message, ConnectionConductor conductor) {
-        TicketImpl<T> ticket = new TicketImpl<>();
+    public void push(Class<? extends OfHeader> registeredMessageType, OfHeader message, ConnectionConductor conductor) {
+        TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
         ticket.setConductor(conductor);
         ticket.setMessage(message);
         ticket.setRegisteredMessageType(registeredMessageType);
+        LOG.debug("ticket scheduling: {}, ticket: {}", registeredMessageType.getSimpleName(), System.identityHashCode(ticket));
         //TODO: block if queue limit reached 
         processQueue.add(ticket);
         scheduleTicket(ticket);
@@ -65,17 +78,17 @@ public class QueueKeeperLightImpl<T> implements QueueKeeper<T> {
     /**
      * @param ticket
      */
-    private void scheduleTicket(Ticket<T> ticket) {
-        pool.execute(TicketProcessorFactory.createProcessor(ticket, listenerMapping));
+    private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
+        pool.execute(TicketProcessorFactory.createProcessor(ticket, versionExtractor, translatorMapping));
     }
 
     @Override
-    public synchronized void addPopListener(PopListener<T> listener) {
+    public synchronized void addPopListener(PopListener<DataObject> listener) {
         listeners.add(listener);
     }
 
     @Override
-    public synchronized boolean removePopListener(PopListener<T> listener) {
+    public synchronized boolean removePopListener(PopListener<DataObject> listener) {
         return listeners.remove(listener);
     }
 
@@ -87,8 +100,8 @@ public class QueueKeeperLightImpl<T> implements QueueKeeper<T> {
     }
 
     @Override
-    public void setListenerMapping(
-            Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
-        this.listenerMapping = listenerMapping;
+    public void setTranslatorMapping(
+            Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping) {
+        this.translatorMapping = translatorMapping;
     }
 }