BUG-1075: ingress back pressure
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / TicketFinisher.java
index deeb4d938215527cbe9d30c9ccee40946d562e0d..0018b2da99e5086e30217836b758cdc5ae124f5d 100644 (file)
@@ -7,67 +7,22 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * @param <OUT> result type
  *
  */
-public class TicketFinisher<OUT> implements Runnable {
-
-    private static final Logger LOG = LoggerFactory
-            .getLogger(TicketFinisher.class);
-
-    private final BlockingQueue<TicketResult<OUT>> queue;
-    private final Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping;
-    private final RegisteredTypeExtractor<OUT> registeredOutTypeExtractor;
+public interface TicketFinisher<OUT> extends Runnable {
 
     /**
-     * @param queue
-     * @param popListenersMapping
-     * @param registeredOutTypeExtractor
+     * initiate shutdown of this worker
      */
-    public TicketFinisher(BlockingQueue<TicketResult<OUT>> queue,
-            Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping,
-            RegisteredTypeExtractor<OUT> registeredOutTypeExtractor) {
-        this.queue = queue;
-        this.popListenersMapping = popListenersMapping;
-        this.registeredOutTypeExtractor = registeredOutTypeExtractor;
-    }
-
-
-    @Override
-    public void run() {
-        while (true) {
-            try {
-                //TODO:: handle shutdown of queue
-                TicketResult<OUT> result = queue.take();
-                long before = System.nanoTime();
-                LOG.debug("finishing ticket(before): {}, {} remain in queue, {} capacity remaining", System.identityHashCode(result),queue.size(), queue.remainingCapacity());
-                List<OUT> processedMessages = result.getResult().get();
-                long after = System.nanoTime();
-                LOG.debug("finishing ticket(after): {}, {} remain in queue, {} capacity remaining, processingTime {} ns", System.identityHashCode(result),queue.size(), queue.remainingCapacity(),after-before);
-                for (OUT msg : processedMessages) {
-                    Class<? extends Object> registeredType =
-                            registeredOutTypeExtractor.extractRegisteredType(msg);
-                    Collection<PopListener<OUT>> popListeners = popListenersMapping.get(registeredType);
-                    if (popListeners == null) {
-                        LOG.warn("no popListener registered for type {}"+registeredType);
-                    } else {
-                        for (PopListener<OUT> consumer : popListeners) {
-                            consumer.onPop(msg);
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                LOG.error(e.getMessage(), e);
-            }
-        }
-    }
+    void finish();
+    
+    /**
+     * notify popListeners
+     * @param processedMessages
+     */
+    void firePopNotification(List<OUT> processedMessages);
 }